classification
Title: Add timeout parameter to Queue.join()
Type: enhancement Stage: resolved
Components: Library (Lib) Versions: Python 3.3
process
Status: closed Resolution: out of date
Dependencies: Superseder:
Assigned To: rhettinger Nosy List: Swen.Wenzel, Yuri.Bochkarev, anacrolix, isoschiz, kdlucas, ncoghlan, pitrou, rhettinger
Priority: low Keywords:

Created on 2010-08-18 17:46 by kdlucas, last changed 2019-08-24 18:33 by rhettinger. This issue is now closed.

Messages (17)
msg114257 - (view) Author: Kelly Lucas (kdlucas) Date: 2010-08-18 17:46
I've seen quite a few people requesting to add a timeout value to the Queue.join() method, as it seems like a nice feature to have when waiting for queue's to finish.

Please add a feature so that Queue.join() will issue a self.all_tasks_done.release() when the timeout value is reached.
msg114263 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2010-08-18 18:29
Can you provide links to the other requests?

This seems to be at odds with the whole premise
of what Queue.join() is trying to do (wait until
all worker threads have marked their tasks as done).
msg114264 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2010-08-18 18:35
FWIW, if you want to do other tasks in a main thread and need to check on the status task completion in worker threads, you can already loop on: 

 while q.unfinished_tasks:
      do_other_fun_stuff()
 q.join()                  # this shouldn't block anymore
msg114271 - (view) Author: Kelly Lucas (kdlucas) Date: 2010-08-18 19:45
Here are a few that I saw:

http://stackoverflow.com/questions/1564501/add-timeout-argument-to-pythons-queue-join

http://www.eggheadcafe.com/software/aspnet/36145181/max-time-threads.aspx

http://efreedom.com/Question/1-1564501/Add-timeout-argument-to-python-s-Queue-join
msg114276 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2010-08-18 20:48
We already have an optional timeout on threading.Thread.join(), which I suppose means that people need/want that feature.
msg114283 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2010-08-18 22:08
One of the provided links is duplicate (repost from StackOverflow).  None of the links offer use cases which can be used to inform the design (i.e. is this the correct approach, should a timeout raise a RuntimeError like Condition.wait() does, should a timeout set a variable to indicate the timeout thread.join() does, should it just return with no indication of the reason for returning, should we just document the unfinished_tasks variable as read-only so that users can check it periodically, what are the common calling patterns, how does this fit in with the original design intent of letting worker threads finish all pending tasks before moving on to code that assumes that the work is done).

Without motivating use cases, it is hard to create a correct design or to know whether this should be done at all.  Unlike, thread.join() which has a long history in many contexts, the Queue.join/task_done API is newer; accordingly, the API should be grown somewhat cautiously.  I don't think we've established using queue.join() with a time-out is a best practice or preferred pattern to solving a given problem.  Instead, all we have is "it seems like a nice feature" which falls short of our usual standards.
msg114284 - (view) Author: Kelly Lucas (kdlucas) Date: 2010-08-18 22:19
This is easy enough to implement by subclassing the Queue class and overriding join(), so it's not a big deal. Just seems like it would be a nice thing to have.
msg114286 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2010-08-18 22:26
I will leave this open for a while.
It may be that it is a good idea
and that some good use cases will
emerge to inform a correct design.

In the meantime, it would be great
if you could post your subclass to
the ASPN Cookbook to collect feedback.
msg115346 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2010-09-02 00:26
Kelly, thanks for posting the idea.
I'm going to close this one for now
but it can be reopened if compelling
use cases arise.
msg157454 - (view) Author: Nick Coghlan (ncoghlan) * (Python committer) Date: 2012-04-04 02:40
I just created #14487 to request a documented API (Queue.pending()) that provides a formal mechanism for asking whether or not any jobs are still pending (analagous to the existing empty() and full() query methods).

Specifically, what I have is a client process that is executed periodically, gathers up a set of tasks and uses a thread pool to submit them in parallel to a synchronous API.

If all tasks complete with no new tasks being scheduled, then the client should terminate. However, if a new task arrives while any existing task is still in progress, then the client should submit it "immediately" (where, due to the time scales involved, "immediately" actually means "within the next few minutes" so I have plenty of scope to let the client block for a while instead of implementing a busy loop).

So, a timeout on join() would actually fit my use case better than the pending() API I proposed in the other issue. The processing loop would then look something like:

while have_tasks_to_process():
   submit_tasks_to_queue()
   try:
       task_queue.join(timeout)
   except Pending:
       pass

The advantage of having the timeout is that it would avoid the clumsy workarounds needed to avoid the busy loop created by the use of a query based approach. (Of course, I'm going to have to use the workaround anyway, since my client runs on Python 2.6, but still, I believe it meets the "concrete use case" criterion).

From a design aesthetic point of view, a pending() query API and a timeout on join() that throws a Pending exception would match the existing empty()/get()/Empty and full()/put()/Full API triples.
msg157455 - (view) Author: Nick Coghlan (ncoghlan) * (Python committer) Date: 2012-04-04 02:42
I killed off my new issue in favour of this one, since they're covering the same ground.
msg157458 - (view) Author: Nick Coghlan (ncoghlan) * (Python committer) Date: 2012-04-04 03:56
The function below is the workaround I plan to use based on the undocumented-but-public Thread attributes that relate to the task queue (it's essentially just a combination of Queue.join() with the existing structure of the Queue.get() implementation):

  def join_queue(q, timeout=None):
      q.all_tasks_done.acquire()
      try:
          if timeout is None:
              while q.unfinished_tasks:
                  self.all_tasks_done.wait()
          elif timeout < 0:
              raise ValueError("'timeout' must be a positive number")
          else:
              endtime = _time() + timeout
              while q.unfinished_tasks:
                  remaining = endtime - _time()
                  if remaining <= 0.0:
                      raise Pending
                  self.all_tasks_done.wait(remaining)
      finally:
          q.all_tasks_done.release()
msg157461 - (view) Author: Nick Coghlan (ncoghlan) * (Python committer) Date: 2012-04-04 06:17
The thread pool impl where I'm using this: http://git.fedorahosted.org/git/?p=pulpdist.git;a=blob;f=src/pulpdist/cli/thread_pool.py
msg157510 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2012-04-04 21:55
Nick, why don't you submit your "workaround" as a patch to Queue.join()?
msg157518 - (view) Author: Nick Coghlan (ncoghlan) * (Python committer) Date: 2012-04-04 23:06
I probably will eventually - I'll be playing catchup a bit on other tasks this month after doing almost no coding for CPython since some time in Feb.
msg233464 - (view) Author: Swen Wenzel (Swen.Wenzel) Date: 2015-01-05 14:03
I have another use case.
The Docs use the producer-consumer pattern as a usage example.
I'm also using this pattern but apparently the consumers are not that stable during development phase.

So if one of your consumers dies during execution of its task, before it can say 'task done', then you will have a deadlock.
You could of course avoid this by adding a finally block which will claim the task is done even if there was some error or exception but then you will lie to the producer!
And suppose you only have one consumer and there are still tasks waiting (which is the case for my application), then you'll still have a deadlock since nobody is there to execute the remaining tasks.
This could be solved by putting the exception handling within the consumer's mainloop like this:

Consumer(Thread):
    def __init__(self, queue):
        self.queue = queue

    def run():
        while True:
            task = self.queue.get()
            try:
                # execute task
            except:
                # handle exception (hopefully) without reraising one
            finally:
                self.queue.task_done()

This way, however, the producer won't notice any issues unless the consumer's exception handler sets a flag or puts the exception into a collection that can be checked by the producer.

But even that is no solution if the consumer executes a task with an endless loop or runs into a deadlock itself.

I would propose to throw an exception if queue.Queue.join() returns because of a timeout since then you can investigate the cause within the exception handler and do not have to check for the consumer's status after each join(). But this is microoptimization so I would also be satisfied with the same solution as for threading.Thread.join().
msg350392 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2019-08-24 18:33
It's been 4 1/2 years since last activity.  Marking as closed again due to lack of interest and due to the rarity of need.  Also we know that a person can already customize join() via a subclass.
History
Date User Action Args
2020-11-20 23:53:43rhettingerlinkissue42420 superseder
2019-08-24 18:33:06rhettingersetstatus: open -> closed
resolution: out of date
messages: + msg350392

stage: needs patch -> resolved
2015-01-05 14:03:54Swen.Wenzelsetnosy: + Swen.Wenzel
messages: + msg233464
2013-11-07 12:59:22Yuri.Bochkarevsetnosy: + Yuri.Bochkarev
2013-04-20 11:13:21isoschizsetnosy: + isoschiz
2012-04-05 03:28:21anacrolixsetnosy: + anacrolix
2012-04-04 23:06:52ncoghlansetmessages: + msg157518
2012-04-04 21:55:06pitrousetmessages: + msg157510
2012-04-04 06:17:17ncoghlansetmessages: + msg157461
2012-04-04 03:56:07ncoghlansetmessages: + msg157458
2012-04-04 02:42:46ncoghlansetmessages: + msg157455
versions: + Python 3.3, - Python 3.2
2012-04-04 02:41:23ncoghlanlinkissue14487 superseder
2012-04-04 02:40:10ncoghlansetstatus: closed -> open

nosy: + ncoghlan
messages: + msg157454

resolution: rejected -> (no value)
2010-09-02 00:26:17rhettingersetstatus: open -> closed
resolution: rejected
messages: + msg115346
2010-08-18 22:26:08rhettingersetmessages: + msg114286
2010-08-18 22:19:42kdlucassetmessages: + msg114284
2010-08-18 22:08:38rhettingersetmessages: + msg114283
2010-08-18 20:48:53pitrousetnosy: + pitrou
messages: + msg114276
2010-08-18 19:45:19kdlucassetmessages: + msg114271
2010-08-18 18:35:02rhettingersetpriority: normal -> low

messages: + msg114264
2010-08-18 18:29:11rhettingersetassignee: rhettinger

messages: + msg114263
nosy: + rhettinger
2010-08-18 18:06:55brian.curtinsetstage: needs patch
type: enhancement
versions: + Python 3.2, - Python 3.1
2010-08-18 18:00:05kdlucassetversions: + Python 3.1, - Python 2.7
2010-08-18 17:46:20kdlucascreate