Index: Lib/test/test_thread.py =================================================================== --- Lib/test/test_thread.py (revision 51064) +++ Lib/test/test_thread.py (working copy) @@ -1,54 +1,15 @@ -# Very rudimentary test of thread module - # Create a bunch of threads, let each do some work, wait until all are done +from test import test_support from test.test_support import verbose import random import thread import time +import unittest -mutex = thread.allocate_lock() -rmutex = thread.allocate_lock() # for calls to random -running = 0 -done = thread.allocate_lock() -done.acquire() - numtasks = 10 +numtrips = 3 -def task(ident): - global running - rmutex.acquire() - delay = random.random() * numtasks - rmutex.release() - if verbose: - print 'task', ident, 'will run for', round(delay, 1), 'sec' - time.sleep(delay) - if verbose: - print 'task', ident, 'done' - mutex.acquire() - running = running - 1 - if running == 0: - done.release() - mutex.release() - -next_ident = 0 -def newtask(): - global next_ident, running - mutex.acquire() - next_ident = next_ident + 1 - if verbose: - print 'creating task', next_ident - thread.start_new_thread(task, (next_ident,)) - running = running + 1 - mutex.release() - -for i in range(numtasks): - newtask() - -print 'waiting for all tasks to complete' -done.acquire() -print 'all tasks done' - class barrier: def __init__(self, n): self.n = n @@ -61,7 +22,7 @@ checkin, checkout = self.checkin, self.checkout checkin.acquire() - self.waiting = self.waiting + 1 + self.waiting += 1 if self.waiting == self.n: self.waiting = self.n - 1 checkout.release() @@ -75,86 +36,139 @@ return checkout.release() -numtrips = 3 -def task2(ident): - global running - for i in range(numtrips): - if ident == 0: - # give it a good chance to enter the next - # barrier before the others are all out - # of the current one - delay = 0.001 - else: - rmutex.acquire() - delay = random.random() * numtasks - rmutex.release() +class ThreadTest(unittest.TestCase): + def __init__(self, testCaseNames): + unittest.TestCase.__init__(self, testCaseNames) + self.mutex = thread.allocate_lock() + self.rmutex = thread.allocate_lock() # for calls to random + self.running = 0 + self.done = thread.allocate_lock() + self.done.acquire() + self.next_ident = 0 + + def task(self, ident): + self.rmutex.acquire() + delay = random.random() * numtasks + self.rmutex.release() if verbose: print 'task', ident, 'will run for', round(delay, 1), 'sec' time.sleep(delay) if verbose: - print 'task', ident, 'entering barrier', i - bar.enter() + print 'task', ident, 'done' + self.mutex.acquire() + self.running -= 1 + if self.running == 0: + self.done.release() + self.mutex.release() + + def newtask(self): + self.mutex.acquire() + self.next_ident += 1 if verbose: - print 'task', ident, 'leaving barrier', i - mutex.acquire() - running -= 1 - # Must release mutex before releasing done, else the main thread can - # exit and set mutex to None as part of global teardown; then - # mutex.release() raises AttributeError. - finished = running == 0 - mutex.release() - if finished: - done.release() + print 'creating task', self.next_ident + thread.start_new_thread(self.task, (self.next_ident,)) + self.running += 1 + self.mutex.release() -print '\n*** Barrier Test ***' -if done.acquire(0): - raise ValueError, "'done' should have remained acquired" -bar = barrier(numtasks) -running = numtasks -for i in range(numtasks): - thread.start_new_thread(task2, (i,)) -done.acquire() -print 'all tasks done' + def run_tasks(self): + for i in range(numtasks): + self.newtask() -# not all platforms support changing thread stack size -print '\n*** Changing thread stack size ***' -if thread.stack_size() != 0: - raise ValueError, "initial stack_size not 0" + def task2(self, ident): + for i in range(numtrips): + if ident == 0: + # give it a good chance to enter the next + # barrier before the others are all out + # of the current one + delay = 0.001 + else: + self.rmutex.acquire() + delay = random.random() * numtasks + self.rmutex.release() + if verbose: + print 'task', ident, 'will run for', round(delay, 1), 'sec' + time.sleep(delay) + if verbose: + print 'task', ident, 'entering barrier', i + self.bar.enter() + if verbose: + print 'task', ident, 'leaving barrier', i + self.mutex.acquire() + self.running -= 1 -thread.stack_size(0) -if thread.stack_size() != 0: - raise ValueError, "stack_size not reset to default" + # XXX decipher this warning + # Must release self.mutex before releasing done, else the main thread can + # exit and set self.mutex to None as part of global teardown; then + # self.mutex.release() raises AttributeError. + finished = self.running == 0 + self.mutex.release() + if finished: + self.done.release() -from os import name as os_name -if os_name in ("nt", "os2", "posix"): + def barrier_test(self): + err_msg = "'done' should have remained acquired" + self.failIf(self.done.acquire(0), err_msg) + self.bar = barrier(numtasks) + self.running = numtasks + for i in range(numtasks): + thread.start_new_thread(self.task2, (i,)) + self.done.acquire() + if verbose: + print 'all tasks done' - tss_supported = 1 - try: - thread.stack_size(4096) - except ValueError: - print 'caught expected ValueError setting stack_size(4096)' - except thread.error: - tss_supported = 0 - print 'platform does not support changing thread stack size' + def test_tests(self): + # This method is here because it's probably better if we run + # these tests in order. + self.run_tasks() + if verbose: + print 'waiting for all tasks to complete' + self.done.acquire() + if verbose: + print 'all tasks done' + self.barrier_test() - if tss_supported: - failed = lambda s, e: s != e - fail_msg = "stack_size(%d) failed - should succeed" - for tss in (262144, 0x100000, 0): - thread.stack_size(tss) - if failed(thread.stack_size(), tss): - raise ValueError, fail_msg % tss - print 'successfully set stack_size(%d)' % tss + def test_stacksize(self): + self.failIf(thread.stack_size() != 0, "initial stack_size not 0") + thread.stack_size(0) + self.failIf(thread.stack_size() != 0, "stack_size not reset to " + \ + "default") + from os import name as os_name + if os_name in ("nt", "os2", "posix"): + tss_supported = 1 + try: + thread.stack_size(4096) + except ValueError: + print 'caught expected ValueError setting stack_size(4096)' + except thread.error: + tss_supported = 0 + if verbose: + print 'platform does not support changing thread stack size' - for tss in (262144, 0x100000): - print 'trying stack_size = %d' % tss - next_ident = 0 - for i in range(numtasks): - newtask() + if tss_supported: + fail_msg = "stack_size(%d) failed - should succeed" + for tss in (262144, 0x100000, 0): + self.assertTrue(thread.stack_size(tss), fail_msg % tss) + if verbose: + print 'successfully set stack_size(%d)' % tss - print 'waiting for all tasks to complete' - done.acquire() - print 'all tasks done' + for tss in (262144, 0x100000): + if verbose: + print 'trying stack_size = %d' % tss + self.next_ident = 0 + for i in range(numtasks): + self.newtask() - # reset stack size to default - thread.stack_size(0) + if verbose: + print 'waiting for all tasks to complete' + self.done.acquire() + if verbose: + print 'all tasks done' + + # reset stack size to default + thread.stack_size(0) + +def test_main(): + test_support.run_unittest(ThreadTest) + +if __name__ == '__main__': + test_main() Index: Lib/test/test_threading.py =================================================================== --- Lib/test/test_threading.py (revision 51064) +++ Lib/test/test_threading.py (working copy) @@ -55,8 +55,10 @@ self.sema.release() +no_stack_size_support = 'platform does not support changing thread stack size' + class ThreadTests(unittest.TestCase): - + # Create a bunch of threads, let each do some work, wait until all are # done. def test_various_ops(self): @@ -87,20 +89,35 @@ # run with a small(ish) thread stack size (256kB) def test_various_ops_small_stack(self): + tss_supported = True if verbose: print 'with 256kB thread stack size...' - threading.stack_size(262144) - self.test_various_ops() - threading.stack_size(0) + try: + threading.stack_size(262144) + except thread.error: + tss_supported = False + if verbose: + print no_stack_size_support + + if tss_supported: + self.test_various_ops() + threading.stack_size(0) # run with a large thread stack size (1MB) def test_various_ops_large_stack(self): + tss_supported = True if verbose: print 'with 1MB thread stack size...' - threading.stack_size(0x100000) - self.test_various_ops() - threading.stack_size(0) + try: + threading.stack_size(0x100000) + except thread.error: + if verbose: + print no_stack_size_support + if tss_supported: + self.test_various_ops() + threading.stack_size(0) + def test_foreign_thread(self): # Check that a "foreign" thread can use the threading module. def f(mutex):