import subprocess import time import threading import os import errno import argparse import signal _LOGLOCK = threading.Lock() _THREADS_OK = 0 class SubprocContext(object): def __enter__(self): _DEVNULL = os.open('/dev/null', os.O_WRONLY) self._subproc = subprocess.Popen(['/bin/sleep', '20'], stdout=_DEVNULL, stderr=_DEVNULL) return self def __exit__(self, exc_type, exc_val, exc_tb): self._subproc.send_signal(signal.SIGKILL) while self._subproc.poll() is None: time.sleep(0.1) def safe_print(s): """Like print, but prevents ugly interleaved lines.""" global _LOGLOCK _LOGLOCK.acquire() print(s) _LOGLOCK.release() def log(s): try: index = '[%d] ' % _TLS.index except AttributeError: index = '' safe_print(index + s) _TLS = threading.local() def _worker(index): global _THREADS_OK _TLS.index = index log('Starting subproc...') with SubprocContext() as ctx: log('subproc running') _THREADS_OK += 1 time.sleep(3) log('done with subproc') log('all done') def parse_args(): parser = argparse.ArgumentParser(description='python_thread_bug') parser.add_argument('-j', '--threads', type=int, action='store', default=1) return parser.parse_args() def _on_signal(sig, frame): print('Got signal -- dying now') os.kill(os.getpid(), signal.SIGKILL) def run_tests(): cmdargs = parse_args() signal.signal(signal.SIGINT, _on_signal) threads = [] for i in range(cmdargs.threads): t = threading.Thread(target=_worker, args=(i,)) t.start() threads.append(t) safe_print('All threads have been started') time.sleep(1) safe_print('%d/%d threads are okay' % (_THREADS_OK, cmdargs.threads)) for t in threads: t.join(2**31) # Because infinite timeout blocks signal delivery if __name__ == '__main__': exit(run_tests())