Index: Python/ceval.c =================================================================== --- Python/ceval.c (revision 64788) +++ Python/ceval.c (working copy) @@ -274,6 +274,9 @@ void PyEval_ReInitThreads(void) { + PyObject *threading, *result; + PyThreadState *tstate; + if (!interpreter_lock) return; /*XXX Can't use PyThread_free_lock here because it does too @@ -283,6 +286,25 @@ interpreter_lock = PyThread_allocate_lock(); PyThread_acquire_lock(interpreter_lock, 1); main_thread = PyThread_get_thread_ident(); + + + /* Reset threading._active_limbo_lock, in case we forked just + * when a thread was starting of stopping + */ + tstate = PyThreadState_GET(); + threading = PyMapping_GetItemString(tstate->interp->modules, + "threading"); + if (threading == NULL) { + /* threading not imported */ + PyErr_Clear(); + return; + } + result = PyObject_CallMethod(threading, "_after_fork", ""); + if (result == NULL) + PyErr_WriteUnraisable(threading); + else + Py_DECREF(result); + Py_DECREF(threading); } #endif Index: Lib/threading.py =================================================================== --- Lib/threading.py (revision 64788) +++ Lib/threading.py (working copy) @@ -825,6 +830,20 @@ from _threading_local import local +def _after_fork(): + # Reset _active_limbo_lock, in case we forked just when + # a thread was starting or stopping + import sys, os + try: + _active_limbo_lock.release() + except ThreadError: + pass + + for ident, thread in _active.items(): + if thread is not current_thread(): + thread._Thread__stop() + + # Self-test code def _test(): Index: Lib/test/test_threading.py =================================================================== --- Lib/test/test_threading.py (revision 64788) +++ Lib/test/test_threading.py (working copy) @@ -322,7 +322,73 @@ msg=('%d references still around' % sys.getrefcount(weak_raising_cyclic_object()))) +class ThreadJoinOnShutdown(unittest.TestCase): + def _run_and_join(self, script): + script = """if 1: + import sys, os, time, threading + + # a thread, which waits for the main program to terminate + def joiningfunc(mainthread): + mainthread.join() + print 'end of thread' + \n""" + script + + import subprocess + p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) + rc = p.wait() + self.assertEqual(p.stdout.read(), "end of main\nend of thread\n") + self.failIf(rc == 2, "interpreter was blocked") + self.failUnless(rc == 0, "Unexpected error") + + def test_join_on_shutdown(self): + script = """if 1: + import os + t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) + t.start() + time.sleep(0.1) + print 'end of main' + """ + self._run_and_join(script) + + + def test_join_in_forked_process(self): + import os + if not hasattr(os, 'fork'): + return + script = """if 1: + if os.fork() != 0: + time.sleep(1) + sys.exit(0) + + t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) + t.start() + time.sleep(0.1) + print 'end of main' + """ + self._run_and_join(script) + + def test_join_in_forked_from_thread(self): + import os + if not hasattr(os, 'fork'): + return + script = """if 1: + mainthread = threading.current_thread() + def worker(): + time.sleep(1) + if os.fork() != 0: + time.sleep(.1) + sys.exit(0) + + t = threading.Thread(target=joiningfunc, args=(mainthread,)) + t.start() + w = threading.Thread(target=worker) + w.start() + print 'end of main' + sys.stdout.flush() # Avoid duplication of buffered data + """ + self._run_and_join(script) + class ThreadingExceptionTests(unittest.TestCase): # A RuntimeError should be raised if Thread.start() is called # multiple times. @@ -362,8 +428,10 @@ def test_main(): - test.test_support.run_unittest(ThreadTests, - ThreadingExceptionTests) + test.test_support.run_unittest(#ThreadTests, + ThreadJoinOnShutdown, + #ThreadingExceptionTests, + ) if __name__ == "__main__": test_main()