diff -r 7b757e6b95f3 Lib/test/test_threaded_import.py --- a/Lib/test/test_threaded_import.py Fri Jul 16 21:00:22 2010 +0200 +++ b/Lib/test/test_threaded_import.py Sat Aug 21 20:43:01 2010 +0200 @@ -5,11 +5,14 @@ # complains several times about module random having no attribute # randrange, and then Python hangs. +import os import imp import sys import unittest -from test.support import verbose, TestFailed, import_module, run_unittest +import shutil +from test.support import verbose, import_module, run_unittest, TESTFN thread = import_module('_thread') +threading = import_module('threading') def task(N, done, done_tasks, errors): try: @@ -24,6 +27,25 @@ def task(N, done, done_tasks, errors): if finished: done.release() +# Create a circular import structure: A -> C -> B -> D -> A + +circular_imports_modules = { + 'A': """if 1: + import time + time.sleep(%(delay)s) + x = 'a' + import C + """, + 'B': """if 1: + import time + time.sleep(%(delay)s) + x = 'b' + import D + """, + 'C': """import B""", + 'D': """import A""", +} + class ThreadedImportTests(unittest.TestCase): @@ -62,6 +84,35 @@ class ThreadedImportTests(unittest.TestC import test.threaded_import_hangers self.assertFalse(test.threaded_import_hangers.errors) + def test_circular_imports(self): + delay = 0.5 + os.mkdir(TESTFN) + self.addCleanup(shutil.rmtree, TESTFN) + sys.path.insert(0, TESTFN) + self.addCleanup(sys.path.remove, TESTFN) + for name, contents in circular_imports_modules.items(): + contents = contents % {'delay': delay} + with open(os.path.join(TESTFN, name + ".py"), "wb") as f: + f.write(contents.encode('utf-8')) + self.addCleanup(sys.modules.pop, name, None) + + # Make sure `time` is already imported + import time + results = [] + def import_ab(): + import A + results.append(getattr(A, 'x', None)) + def import_ba(): + import B + results.append(getattr(B, 'x', None)) + t1 = threading.Thread(target=import_ab) + t2 = threading.Thread(target=import_ba) + t1.start() + t2.start() + t1.join() + t2.join() + self.assertEqual(set(results), {'a', 'b'}) + def test_main(): run_unittest(ThreadedImportTests)