import os import select import signal import socket import subprocess import sys import threading import time try: from time import monotonic as timer except ImportError: from time import time as timer DURATION = 5.0 ITIMER = 0.2 # seconds NPROC = 5 POPEN_ARGS = [sys.executable, '-c', 'pass'] SIGNAMES = ('SIGINT', 'SIGALRM', 'SIGWINCH', 'SIGTERM', 'SIGCHLD') def dump(text): os.write(1, text) class ServerThread(threading.Thread): daemon = True def __init__(self): threading.Thread.__init__(self) self.sock = socket.socket() self.sock.bind(('', 0)) self.sock.listen(1) self.addr = ('127.0.0.1', self.sock.getsockname()[1]) def run(self): if hasattr(signal, 'pthread_sigmask'): signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while True: # client connect() will block if the server is not accepting # connections time.sleep(ITIMER * 5) try: client_sock, addr = self.sock.accept() except InterruptedError: # not interested by EINTR at the server side continue client_sock.close() server = ServerThread() server.start() def func(): global connect_in_progress client = socket.socket() dump(b'[') try: connect_in_progress = True client.connect(server.addr) connect_in_progress = False except socket.error as exc: if exc.errno == 23232030: # On FreeBSD, connect() sometimes fail with ConnectionResetError dump(b'#') connect_in_progress = False else: raise dump(b']') client.close() got_signals = 0 connect_eintr = 0 connect_in_progress = False def signal_handler(*args): global got_signals, connect_eintr, connect_in_progress got_signals += 1 if connect_in_progress: dump(b'*') connect_eintr += 1 else: dump(b'_') for signame in SIGNAMES: if not hasattr(signal, signame): continue signum = getattr(signal, signame) print("Register %s" % signame) signal.signal(signum, signal_handler) if hasattr(signal, 'siginterrupt'): signal.siginterrupt(signum, True) if hasattr(signal, 'SIGALRM') and ITIMER: print("Send SIGALRM every %.1f ms" % (ITIMER * 1e3)) signal.setitimer(signal.ITIMER_REAL, ITIMER, ITIMER) t0 = timer() print("Run func() during %.1f seconds" % DURATION) print("Type CTRL+c, resize the window, etc.") print("") null = os.open(os.path.devnull, os.O_WRONLY) ncall = 0 while True: dt = timer() - t0 if dt >= DURATION: break # Child processes send SIGCHLD to the parent process procs = [subprocess.Popen(POPEN_ARGS, stdout=null, stderr=null) for index in range(NPROC)] func() for proc in procs: proc.wait() ncall += 1 if hasattr(signal, 'SIGALRM'): signal.alarm(0) print("") print("") print("Test completed in %.1f sec" % dt) print("func() has been called %s times" % ncall) print("Got %s signals" % got_signals) print("Got %s signals during connect()" % connect_eintr) server.sock.close()