Index: output/test_thread =================================================================== --- output/test_thread (revision 58058) +++ output/test_thread (working copy) @@ -1,18 +0,0 @@ -test_thread -waiting for all tasks to complete -all tasks done - -*** Barrier Test *** -all tasks done - -*** Changing thread stack size *** -caught expected ValueError setting stack_size(4096) -successfully set stack_size(262144) -successfully set stack_size(1048576) -successfully set stack_size(0) -trying stack_size = 262144 -waiting for all tasks to complete -all tasks done -trying stack_size = 1048576 -waiting for all tasks to complete -all tasks done Index: test_thread.py =================================================================== --- test_thread.py (revision 58058) +++ test_thread.py (working copy) @@ -1,55 +1,99 @@ # Very rudimentary test of thread module - # Create a bunch of threads, let each do some work, wait until all are done -from test.test_support import verbose +# Imports + +import unittest import random import thread import time -mutex = thread.allocate_lock() -rmutex = thread.allocate_lock() # for calls to random -running = 0 -done = thread.allocate_lock() -done.acquire() +from test import test_support +from test.test_support import verbose -numtasks = 10 +# Module-level variables +_mutex = thread.allocate_lock() +_rmutex = thread.allocate_lock() # for calls to random +_done = thread.allocate_lock() +_done.acquire() + +_running = 0 # Number of task threads currently running +_numtasks = 10 # Number of task threads to start up in each test stage +_next_ident = 0 # Identifier for next task thread to start +_numtrips = 3 # Number of times that threads try to pass the barrier + +# Child thread functions + def task(ident): - global running - rmutex.acquire() - delay = random.random() * numtasks - rmutex.release() + # Function that will be executed by child threads. + # ident is a number that uniquely identifies the thread. + # Sleeps for a random amount of time before finishing. + global _running + _rmutex.acquire() + delay = random.random() * _numtasks + _rmutex.release() if verbose: print('task', ident, 'will run for', round(delay, 1), 'sec') time.sleep(delay) if verbose: print('task', ident, 'done') - mutex.acquire() - running = running - 1 - if running == 0: - done.release() - mutex.release() + _mutex.acquire() + _running = _running - 1 + if _running == 0: + _done.release() + _mutex.release() -next_ident = 0 -def newtask(): - global next_ident, running - mutex.acquire() - next_ident = next_ident + 1 + +def new_task(): + # Creates a new child thread which will execute the + # task() function. + global _next_ident, _running + _mutex.acquire() + _next_ident = _next_ident + 1 if verbose: - print('creating task', next_ident) - thread.start_new_thread(task, (next_ident,)) - running = running + 1 - mutex.release() + print('creating task', _next_ident) + thread.start_new_thread(task, (_next_ident,)) + _running = _running + 1 + _mutex.release() -for i in range(numtasks): - newtask() -print('waiting for all tasks to complete') -done.acquire() -print('all tasks done') +def task_with_barrier(ident, bar): + # A function to be executed by child threads. + # ident is a number that uniquely identifies the thread. + # bar must be a _Barrier object. + # The thread tries to enter, then leave, this barrier. + global _running + for i in range(_numtrips): + if ident == 0: + # give it a good chance to enter the next + # barrier before the others are all out + # of the current one + delay = 0.001 + else: + _rmutex.acquire() + delay = random.random() * _numtasks + _rmutex.release() + if verbose: + print('task', ident, 'will run for', round(delay, 1), 'sec') + time.sleep(delay) + if verbose: + print('task', ident, 'entering barrier', i) + bar.enter() + if verbose: + print('task', ident, 'leaving barrier', i) + _mutex.acquire() + _running -= 1 + # Must release _mutex before releasing _done, else the main thread can + # exit and set _mutex to None as part of global teardown; then + # _mutex.release() raises AttributeError. + finished = _running == 0 + _mutex.release() + if finished: + _done.release() -class barrier: + +class _Barrier: def __init__(self, n): self.n = n self.waiting = 0 @@ -75,86 +119,73 @@ return checkout.release() -numtrips = 3 -def task2(ident): - global running - for i in range(numtrips): - if ident == 0: - # give it a good chance to enter the next - # barrier before the others are all out - # of the current one - delay = 0.001 - else: - rmutex.acquire() - delay = random.random() * numtasks - rmutex.release() - if verbose: - print('task', ident, 'will run for', round(delay, 1), 'sec') - time.sleep(delay) - if verbose: - print('task', ident, 'entering barrier', i) - bar.enter() - if verbose: - print('task', ident, 'leaving barrier', i) - mutex.acquire() - running -= 1 - # Must release mutex before releasing done, else the main thread can - # exit and set mutex to None as part of global teardown; then - # mutex.release() raises AttributeError. - finished = running == 0 - mutex.release() - if finished: - done.release() -print('\n*** Barrier Test ***') -if done.acquire(0): - raise ValueError("'done' should have remained acquired") -bar = barrier(numtasks) -running = numtasks -for i in range(numtasks): - thread.start_new_thread(task2, (i,)) -done.acquire() -print('all tasks done') +class ThreadTests(unittest.TestCase): + def testBasicMultiThreading(self): + for i in range(_numtasks): + new_task() -# not all platforms support changing thread stack size -print('\n*** Changing thread stack size ***') -if thread.stack_size() != 0: - raise ValueError("initial stack_size not 0") + print('waiting for all tasks to complete') + _done.acquire() + print('all tasks done') -thread.stack_size(0) -if thread.stack_size() != 0: - raise ValueError("stack_size not reset to default") + def testBarrier(self): + global _running + self.assertFalse(_done.acquire(0), + "'_done' should have remained acquired") + bar = _Barrier(_numtasks) + _running = _numtasks + for i in range(_numtasks): + thread.start_new_thread(task_with_barrier, (i, bar)) + _done.acquire() + print('all tasks done') -from os import name as os_name -if os_name in ("nt", "os2", "posix"): + def testChangeStackSize(self): + self.assertEqual(0, thread.stack_size(), + 'initial stack_size not 0') - tss_supported = 1 - try: - thread.stack_size(4096) - except ValueError: - print('caught expected ValueError setting stack_size(4096)') - except thread.error: - tss_supported = 0 - print('platform does not support changing thread stack size') + thread.stack_size(0) + self.assertEqual(0, thread.stack_size(), + 'stack_size not reset to default') + # not all platforms support changing thread stack size + from os import name as os_name + if not os_name in ("nt", "os2", "posix"): + return - if tss_supported: - failed = lambda s, e: s != e - fail_msg = "stack_size(%d) failed - should succeed" - for tss in (262144, 0x100000, 0): - thread.stack_size(tss) - if failed(thread.stack_size(), tss): - raise ValueError(fail_msg % tss) - print('successfully set stack_size(%d)' % tss) + tss_supported = 1 + try: + thread.stack_size(4096) + except ValueError: + print('caught expected ValueError setting stack_size(4096)') + except thread.error: + tss_supported = 0 + print('platform does not support changing thread stack size') - for tss in (262144, 0x100000): - print('trying stack_size = %d' % tss) - next_ident = 0 - for i in range(numtasks): - newtask() + if tss_supported: + fail_msg = "stack_size(%d) failed - should succeed" + for tss in (262144, 0x100000, 0): + thread.stack_size(tss) + self.assertEqual( thread.stack_size(), + tss, + fail_msg % tss ) + print('successfully set stack_size(%d)' % tss) - print('waiting for all tasks to complete') - done.acquire() - print('all tasks done') + for tss in (262144, 0x100000): + print('trying stack_size = %d' % tss) + _next_ident = 0 + for i in range(_numtasks): + new_task() - # reset stack size to default - thread.stack_size(0) + print('waiting for all tasks to complete') + _done.acquire() + print('all tasks done') + + # reset stack size to default + thread.stack_size(0) + + +def test_main(): + test_support.run_unittest(ThreadTests) + +if __name__=='__main__': + test_main()