classification
Title: Simplify using Queues with consumer threads
Type: Stage:
Components: None Versions: Python 2.5
process
Status: closed Resolution: accepted
Dependencies: Superseder:
Assigned To: rhettinger Nosy List: rhettinger, tim.peters
Priority: normal Keywords: patch

Created on 2006-03-21 21:36 by rhettinger, last changed 2006-03-24 20:44 by rhettinger. This issue is now closed.

Files
File name Uploaded Description Edit
Queue.diff rhettinger, 2006-03-21 21:36 Diff for Queue.py
libqueue.diff rhettinger, 2006-03-21 22:27 Diff for libqueue.tex
Messages (6)
msg49806 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2006-03-21 21:36
When Queues are used to communicate between producer 
and consumer threads, there is often a need to 
determine when all of the enqueued tasks have been 
completed.

With this small patch, determining when all work is 
done is as simple as adding q.task_done() to each 
consumer thread and q.join() to the main thread.

Without the patch, the next best approach is to count 
the number of puts, create a second queue filled by 
the consumer when a task is done, and for the main 
thread to call successive blocking gets on the result 
queue until all of the puts have been accounted for:

    def worker(): 
        while 1: 
            task = tasks_in.get() 
            do_work(task) 
            tasks_out.put(None)

    tasks_in = Queue() 
    tasks_out = Queue() 
    for i in range(num_worker_threads): 
         Thread(target=worker).start()

    n = 0 
    for elem in source():
        n += 1
        tasks_in.put(elem) 

    # block until tasks are done 
    for i in range(n): 
        tasks_out.get()

That approach is not complicated but it does entail 
more lines of code and tracking some auxiliary data.
This becomes cumersome and error-prone when an app 
has multiple occurences of q.put() and q.get().

The patch essentially encapsulates this approach into 
two methods, making it effortless to use and easy to 
graft on to existing uses of Queue. So, the above 
code simplies to:

    def worker(): 
        while 1: 
            task = q.get() 
            do_work(task) 
            q.task_done() 

    q = Queue() 
    for i in range(num_worker_threads): 
         Thread(target=worker).start() 

    for elem in source():
        q.put(elem) 

    # block until tasks are done 
    q.join() 

The put counting is automatic, there is no need for a 
separate queue object, the code readably expresses 
its intent with clarity.  Also, it is easy to inpect 
for accuracy, each get() followed by a task_done().  
The ease of inspection remains even when there are 
multiple gets and puts scattered through the code (a 
situtation which would become complicated for the two 
Queue approach).

If accepted, will add docs with an example.

Besides being a fast, lean, elegant solution, the 
other reason to accept the patch is that the 
underlying problem appears again and again, requiring 
some measure to invention to solve it each time.  
There are a number of approaches but none as simple, 
fast, or as broadly applicable as having the queue 
itself track items loaded and items completed.
msg49807 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2006-03-21 22:27
Logged In: YES 
user_id=80475

Tim, do you have a chance to look at this?
msg49808 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2006-03-22 01:42
Logged In: YES 
user_id=31435

Yup, I'll try to make time tomorrow (can't today). 
_Offhand_ it sounds like a nice addition to me.
msg49809 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2006-03-22 06:02
Logged In: YES 
user_id=80475

Thanks.  There are two particular areas for extra 
attention.  

First, should the waiter acquire/release pairs be in a 
try/finally (iow, is there some behavior in notify() or 
release() that potentially needs to be trapped)?

Second, should the notify() in task_done() really be a 
notifyAll() (iow, does it make sense that multiple joins 
may be pending)?

Thanks again.
msg49810 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2006-03-24 00:17
Logged In: YES 
user_id=31435

I marked this as Accepted, but there are some things I'd
like to see changed:

- A Condition is best named after the predicate it
represents.  So, e.g., instead of the generic "waiter", a
better name would be "all_tasks_done".  When you eventually
.notify() the Condition, you're notifing its wait()er that
"all tasks (may be) done", and "all tasks (may be) done" is
what the wait()er is waiting _for_.  "all_tasks_done.wait()"
 makes that much clearer than "waiter.wait()".

- A Condition.wait() can be interrupted by (at least)
KeyboardInterrupt, so the acquire/release around a
Condition.wait() call should always be in a try/finally (so
that the Condition is release()d no matter what).  All other
Condition.wait()s in Queue do protect themselves this way. 
I don't see a need for try/finally around other uses, except
possibly that:

- Given the intended semantics, it would be good to raise an
exception if .unfinished_tasks becomes negative; i.e., make
it a detected programmer error if task_done() is called "too
often" (although again the Condition needs to be release()d
no matter what, and a try/finally may be expedient toward
that end).

- Since any number of threads _may_ be waiting in
Queue.join(), yes, .notifyAll() is better.  The other
conditions in Queue don't do that because there's a key
difference:  at most one thread waiting on not_full or
not_empty can make progress when one of those is "signaled",
so it would be wasteful to wake up more than one thread
waiting on those.  In contrast, all threads waiting on
.waiter can make progress when all tasks are in fact done. 
You can do that with a notifyAll() in task_done(), or by
adding a notify() near the end of join() (then all threads
waiting on this condition will get notified in domino
fashion).  The notifyAll() way is "purer".

- It's inevitable that someone will ask Queue.join() to grow
an optional timeout argument.  OK by me if that waits ;-).
msg49811 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2006-03-24 20:44
Logged In: YES 
user_id=80475

Committed as revision 43298.
History
Date User Action Args
2006-03-21 21:36:08rhettingercreate