import os import select import signal import socket import subprocess import sys import threading import time try: from time import monotonic as timer except ImportError: from time import time as timer DURATION = 5.0 ITIMER = 0.2 # seconds NPROC = 5 POPEN_ARGS = [sys.executable, '-c', 'pass'] SIGNAMES = ('SIGINT', 'SIGALRM', 'SIGWINCH', 'SIGTERM', 'SIGCHLD') server = socket.socket() server.bind(('', 0)) server.listen(1) server_addr = ('127.0.0.1', server.getsockname()[1]) def dump(text): os.write(1, text) def server_thread(server): if hasattr(signal, 'pthread_sigmask'): signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while True: # client connect() will block if the server is not accepting # connections time.sleep(ITIMER * 5) try: sock, addr = server.accept() except InterruptedError: # not interested by EINTR at the server side continue sock.close() thread = threading.Thread(target=server_thread, args=(server,), daemon=True) thread.start() def func(): global connect_in_progress client = socket.socket() dump(b'[') try: connect_in_progress = True client.connect(server_addr) connect_in_progress = False except ConnectionResetError: # On FreeBSD, connect() sometimes fail with ConnectionResetError dump(b'#') connect_in_progress = False dump(b']') client.close() got_signals = 0 connect_eintr = 0 connect_in_progress = False def signal_handler(*args): global got_signals, connect_eintr, connect_in_progress got_signals += 1 if connect_in_progress: dump(b'*') connect_eintr += 1 else: dump(b'_') for signame in SIGNAMES: if not hasattr(signal, signame): continue signum = getattr(signal, signame) print("Register %s" % signame) signal.signal(signum, signal_handler) if hasattr(signal, 'siginterrupt'): signal.siginterrupt(signum, True) if hasattr(signal, 'SIGALRM') and ITIMER: print("Send SIGALRM every %.1f ms" % (ITIMER * 1e3)) signal.setitimer(signal.ITIMER_REAL, ITIMER, ITIMER) t0 = timer() print("Run func() during %.1f seconds" % DURATION) print("Type CTRL+c, resize the window, etc.") print("") ncall = 0 while True: dt = timer() - t0 if dt >= DURATION: break # Child processes send SIGCHLD to the parent process procs = [subprocess.Popen(POPEN_ARGS) for index in range(NPROC)] func() for proc in procs: proc.wait() ncall += 1 if hasattr(signal, 'SIGALRM'): signal.alarm(0) print() print() print("Test completed in %.1f sec" % dt) print("func() has been called %s times" % ncall) print("Got %s signals" % got_signals) print("Got %s signals during connect()" % connect_eintr) server.close()