Index: Queue.py =================================================================== --- Queue.py (revision 43025) +++ Queue.py (working copy) @@ -35,7 +35,41 @@ # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) + # Notify waiter whenever the number of unfinished tasks drops to zero; + # thread waiting to join() is notified to resume + self.waiter = threading.Condition(self.mutex) + self.unfinished_tasks = 0 + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by Queue consumer threads. For each get() used to fetch a task, + there should be a subsequent call to task_done() when processing on + that task is complete. + + When all pending tasks have been completed, notify a thread waiting to + join() to resume when all items have been processed. + """ + self.waiter.acquire() + self.unfinished_tasks -= 1 + if not self.unfinished_tasks: + self.waiter.notify() + self.waiter.release() + + def join(self): + """Blocks until items in the Queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + Queue. The count goes down whenever a consumer thread makes a task_done() + call (indicating the item was retrieved and all work on it is complete). + + When the count of unfinished tasks drops to zero, join() unblocks. + """ + self.waiter.acquire() + while self.unfinished_tasks: + self.waiter.wait() + self.waiter.release() + def qsize(self): """Return the approximate size of the queue (not reliable!).""" self.mutex.acquire() @@ -86,6 +120,7 @@ raise Full self.not_full.wait(remaining) self._put(item) + self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release()