diff -r 1cd2082ade48 Lib/test/test_dummy_thread.py --- a/Lib/test/test_dummy_thread.py Sun Mar 13 22:49:44 2011 -0400 +++ b/Lib/test/test_dummy_thread.py Tue Mar 15 01:33:25 2011 -0400 @@ -9,6 +9,7 @@ import time import queue import random +import traceback import unittest from test import support @@ -83,6 +84,30 @@ self.assertTrue((end_time - start_time) >= DELAY, "Blocking by unconditional acquiring failed.") + def test_acquire_timeout(self): + """Test invoking acquire() with a positive timeout when the lock is + already acquired. Ensure that time.sleep() is invoked with the given + timeout and that False is returned.""" + self.lock.acquire() + + mock_sleep = MockFunction() + orig_sleep = time.sleep + time.sleep = mock_sleep + try: + retval = self.lock.acquire(waitflag=0, timeout=1) + finally: + time.sleep = orig_sleep + + self.assertTrue(mock_sleep.invoked) + self.assertEqual(mock_sleep.last_args, (1,)) + self.assertIs(retval, False) + + def test_context_manager(self): + self.lock.release = MockFunction() + with self.lock: + self.assertFalse(self.lock.release.invoked) + self.assertTrue(self.lock.release.invoked) + class MiscTests(unittest.TestCase): """Miscellaneous tests.""" @@ -168,6 +193,73 @@ "Not all %s threads executed properly after %s sec." % (thread_count, DELAY)) + def test_args_not_tuple(self): + """ + Test invoking start_new_thread() with a non-tuple value for "args". + Expect TypeError with a meaningful error message to be raised. + """ + with self.assertRaises(TypeError) as cm: + _thread.start_new_thread(MockFunction(), []) + self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple") + + def test_kwargs_not_dict(self): + """ + Test invoking start_new_thread() with a non-dict value for "kwargs". + Expect TypeError with a meaningful error message to be raised. + """ + with self.assertRaises(TypeError) as cm: + _thread.start_new_thread(MockFunction(), tuple(), kwargs=[]) + self.assertEqual(cm.exception.args[0], "3rd arg must be a dict") + + def test_SystemExit(self): + """ + Test invoking start_new_thread() with a function that raises SystemExit. + The exception should be discarded. + """ + func = MockFunction(exception_type=SystemExit) + _thread.start_new_thread(func, tuple()) + + def test_RaiseException(self): + """ + Test invoking start_new_thread() with a function that raises exception. + The exception should be discarded and the traceback should be printed + via traceback.print_exc() + """ + func = MockFunction(exception_type=Exception) + print_exc_replacement = MockFunction() + print_exc_original = traceback.print_exc + traceback.print_exc = print_exc_replacement + try: + _thread.start_new_thread(func, tuple()) + finally: + traceback.print_exc = print_exc_original + + self.assertTrue(print_exc_replacement.invoked) + +class Test_stack_size(unittest.TestCase): + """Unit tests for _dummy_thread.stack_size""" + + def test_None(self): + retval = _thread.stack_size(None) + self.assertEqual(retval, 0) + + def test_not_None(self): + with self.assertRaises(_thread.error) as cm: + _thread.stack_size("") + self.assertEquals(cm.exception.args[0], + "setting thread stack size not supported") + +class MockFunction: + def __init__(self, exception_type=None): + self.exception_type = exception_type + self.invoked = False + def __call__(self, *args, **kwargs): + self.invoked = True + self.last_args = args + self.last_kwargs = kwargs + if self.exception_type is not None: + raise self.exception_type() + def test_main(imported_module=None): global _thread, DELAY if imported_module: @@ -176,7 +268,7 @@ if support.verbose: print() print("*** Using %s as _thread module ***" % _thread) - support.run_unittest(LockTests, MiscTests, ThreadTests) + support.run_unittest(LockTests, MiscTests, ThreadTests, Test_stack_size) if __name__ == '__main__': test_main()