diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py --- a/Lib/test/test_importlib/test_locks.py +++ b/Lib/test/test_importlib/test_locks.py @@ -47,84 +47,96 @@ else: class Frozen_ModuleLockAsRLockTests(unittest.TestCase): pass class Source_ModuleLockAsRLockTests(unittest.TestCase): pass -@unittest.skipUnless(threading, "threads needed for this test") -class DeadlockAvoidanceTests: +if threading is not None: + class DeadlockAvoidanceTests: - def setUp(self): - try: - self.old_switchinterval = sys.getswitchinterval() - sys.setswitchinterval(0.000001) - except AttributeError: - self.old_switchinterval = None + def setUp(self): + try: + self.old_switchinterval = sys.getswitchinterval() + sys.setswitchinterval(0.000001) + except AttributeError: + self.old_switchinterval = None - def tearDown(self): - if self.old_switchinterval is not None: - sys.setswitchinterval(self.old_switchinterval) + def tearDown(self): + if self.old_switchinterval is not None: + sys.setswitchinterval(self.old_switchinterval) - def run_deadlock_avoidance_test(self, create_deadlock): - NLOCKS = 10 - locks = [self.LockType(str(i)) for i in range(NLOCKS)] - pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] - if create_deadlock: - NTHREADS = NLOCKS - else: - NTHREADS = NLOCKS - 1 - barrier = threading.Barrier(NTHREADS) - results = [] - def _acquire(lock): - """Try to acquire the lock. Return True on success, False on deadlock.""" - try: - lock.acquire() - except self.DeadlockError: - return False + def run_deadlock_avoidance_test(self, create_deadlock): + NLOCKS = 10 + locks = [self.LockType(str(i)) for i in range(NLOCKS)] + pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] + if create_deadlock: + NTHREADS = NLOCKS else: - return True - def f(): - a, b = pairs.pop() - ra = _acquire(a) - barrier.wait() - rb = _acquire(b) - results.append((ra, rb)) - if rb: - b.release() - if ra: - a.release() - lock_tests.Bunch(f, NTHREADS).wait_for_finished() - self.assertEqual(len(results), NTHREADS) - return results + NTHREADS = NLOCKS - 1 + barrier = threading.Barrier(NTHREADS) + results = [] - def test_deadlock(self): - results = self.run_deadlock_avoidance_test(True) - # At least one of the threads detected a potential deadlock on its - # second acquire() call. It may be several of them, because the - # deadlock avoidance mechanism is conservative. - nb_deadlocks = results.count((True, False)) - self.assertGreaterEqual(nb_deadlocks, 1) - self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) + def _acquire(lock): + """Try to acquire the lock. Return True on success, + False on deadlock.""" + try: + lock.acquire() + except self.DeadlockError: + return False + else: + return True - def test_no_deadlock(self): - results = self.run_deadlock_avoidance_test(False) - self.assertEqual(results.count((True, False)), 0) - self.assertEqual(results.count((True, True)), len(results)) + def f(): + a, b = pairs.pop() + ra = _acquire(a) + barrier.wait() + rb = _acquire(b) + results.append((ra, rb)) + if rb: + b.release() + if ra: + a.release() + lock_tests.Bunch(f, NTHREADS).wait_for_finished() + self.assertEqual(len(results), NTHREADS) + return results + def test_deadlock(self): + results = self.run_deadlock_avoidance_test(True) + # At least one of the threads detected a potential deadlock on its + # second acquire() call. It may be several of them, because the + # deadlock avoidance mechanism is conservative. + nb_deadlocks = results.count((True, False)) + self.assertGreaterEqual(nb_deadlocks, 1) + self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) -DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError - for kind, splitinit in init.items()} + def test_no_deadlock(self): + results = self.run_deadlock_avoidance_test(False) + self.assertEqual(results.count((True, False)), 0) + self.assertEqual(results.count((True, True)), len(results)) -(Frozen_DeadlockAvoidanceTests, - Source_DeadlockAvoidanceTests - ) = test_util.test_both(DeadlockAvoidanceTests, - LockType=LOCK_TYPES, DeadlockError=DEADLOCK_ERRORS) + + DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError + for kind, splitinit in init.items()} + + (Frozen_DeadlockAvoidanceTests, + Source_DeadlockAvoidanceTests + ) = test_util.test_both(DeadlockAvoidanceTests, + LockType=LOCK_TYPES, + DeadlockError=DEADLOCK_ERRORS) +else: + DEADLOCK_ERRORS = {} + + class Frozen_DeadlockAvoidanceTests(unittest.TestCase): + pass + + class Source_DeadlockAvoidanceTests(unittest.TestCase): + pass class LifetimeTests: @property def bootstrap(self): return self.init._bootstrap