from thread import start_new_thread, get_ident import os import threading import time import unittest def log(msg): #pass os.write(1, msg + "\n") def _wait(): time.sleep(1e-9) #time.sleep(0.1) class Bunch(object): def __init__(self, f, n, wait_before_exit=False): self.f = f self.n = n self.started = [] self.finished = [] self._can_exit = not wait_before_exit def task(i): tid = get_ident() self.started.append(tid) try: f(i) finally: self.finished.append(tid) while not self._can_exit: _wait() try: for i in range(n): start_new_thread(task, (i,)) except: self._can_exit = True raise def wait_for_started(self): while len(self.started) < self.n: _wait() def wait_for_finished(self): while len(self.finished) < self.n: _wait() def do_finish(self): self._can_exit = True class ConditionTests(unittest.TestCase): condtype = staticmethod(threading.Condition) def test_notify(self): cond = self.condtype() log("[check notify] START") N = 5 results1 = [] results2 = [] phase_num = 0 f_ns = [] def f(i): log("f#%s: acquire..." % i) cond.acquire() log("f#%s: wait..." % i) cond.wait() log("f#%s: release" % i) cond.release() log("f#%s: append result 1" % i) results1.append(phase_num) log("f#%s: append result 1 -- %s" % (i, results1)) log("f#%s: 2nd acquire..." % i) cond.acquire() log("f#%s: 2nd wait..." % i) cond.wait() log("f#%s: 2nd release" % i) cond.release() log("f#%s: result2 append" % i) results2.append(phase_num) log("f#%s: result2 append -- %s" % (i, results2)) log("check notify: create bunch") b = Bunch(f, N) log("check notify: wait started") b.wait_for_started() log("check notify: _wait") _wait() self.assertEqual(results1, []) log("check notify: notify 3 threads") # Notify 3 threads at first cond.acquire() cond.notify(3) log("check notify: _wait") _wait() phase_num = 1 cond.release() log("check notify: while < 3: _wait") while len(results1) < 3: _wait() log("check notify: while < 3: _wait -- done") self.assertEqual(results1, [1] * 3) self.assertEqual(results2, []) log("check notify: notify 5 threads") # Notify 5 threads: they might be in their first or second wait cond.acquire() cond.notify(5) _wait() phase_num = 2 cond.release() log("check notify: while < 8: _wait") while len(results1) + len(results2) < 8: _wait() log("check notify: while < 8: _wait -- done") self.assertEqual(results1, [1] * 3 + [2] * 2) self.assertEqual(results2, [2] * 3) log("check notify: notify all") # Notify all threads: they are all in their second wait cond.acquire() cond.notify_all() _wait() phase_num = 3 cond.release() log("check notify: while < 5: _wait") while len(results2) < 5: _wait() log("check notify: while < 5: _wait -- done") self.assertEqual(results1, [1] * 3 + [2] * 2) self.assertEqual(results2, [2] * 3 + [3] * 2) log("check notify: wait_for_finished") b.wait_for_finished() log("[check notify] STOP") if __name__ == "__main__": unittest.main()