Index: Lib/threading.py =================================================================== --- Lib/threading.py (revision 74302) +++ Lib/threading.py (working copy) @@ -209,6 +209,16 @@ else: return True + def _reset_lock(self, lock=None): + """Throw away the old lock and replace it with this one.""" + if lock is None: + lock = Lock() + self._lock = lock + # Reset these exported bound methods. If we don't do this, these + # bound methods will still refer to the old lock. + self.acquire = lock.acquire + self.release = lock.release + def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-aquired lock") @@ -839,6 +849,10 @@ # its new value since it can have changed. ident = _get_ident() thread._ident = ident + # Any locks hanging off of the active thread may be in an + # invalid state, so we reset them. + thread._block._reset_lock() + thread._started._cond._reset_lock() new_active[ident] = thread else: # All the others are already stopped. Index: Lib/test/test_threading.py =================================================================== --- Lib/test/test_threading.py (revision 74302) +++ Lib/test/test_threading.py (working copy) @@ -427,6 +427,71 @@ self._run_and_join(script) +class ThreadAndForkTests(unittest.TestCase): + + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + def test_join_fork_stop_deadlock(self): + # There used to be a possible deadlock when forking from a child + # thread. See http://bugs.python.org/issue6643. + + # Skip platforms with known problems forking from a worker thread. + # See http://bugs.python.org/issue3863. + if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): + raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) + script = """if 1: + import os, sys, time, threading + + finish_fork = False + finish_join = False + + def worker(): + # Wait just a bit before forking so that the original thread + # makes it into my_acquire. + global finish_fork + global finish_join + while not finish_fork: + pass + childpid = os.fork() + finish_join = True + if childpid != 0: + # Parent process just waits for child. + os.waitpid(childpid, 0) + # Child process should just return. + + w = threading.Thread(target=worker) + + # Stub out the private condition variable's lock acquire method. + # There is a race to acquire this between w.join() and w.__stop(), + # which is called when the thread returns. + condition = w._block + orig_acquire = condition.acquire + call_count = 0 + def my_acquire(): + global call_count + global finish_join + global finish_fork + orig_acquire() + finish_fork = True + if call_count == 0: + while not finish_join: + pass + call_count += 1 + condition.acquire = my_acquire + + w.start() + w.join() + print('end of main') + """ + import subprocess + p = subprocess.Popen([sys.executable, "-c", script], + stdout=subprocess.PIPE) + rc = p.wait() + data = p.stdout.read().decode().replace('\r', '') + self.assertEqual(data, "end of main\n") + self.assertFalse(rc == 2, "interpreter was blocked") + self.assertTrue(rc == 0, "Unexpected error") + + class ThreadingExceptionTests(unittest.TestCase): # A RuntimeError should be raised if Thread.start() is called # multiple times. @@ -467,9 +532,10 @@ def test_main(): test.support.run_unittest(ThreadTests, - ThreadJoinOnShutdown, - ThreadingExceptionTests, - ) + ThreadJoinOnShutdown, + ThreadingExceptionTests, + ThreadAndForkTests, + ) if __name__ == "__main__": test_main()