classification
Title: Add close method to queue.Queue
Type: enhancement Stage: resolved
Components: Library (Lib) Versions: Python 3.7
process
Status: closed Resolution: rejected
Dependencies: Superseder:
Assigned To: rhettinger Nosy List: davin, mwf, rhettinger, tim.peters
Priority: low Keywords:

Created on 2017-03-02 20:12 by mwf, last changed 2017-03-05 03:38 by rhettinger. This issue is now closed.

Messages (6)
msg288828 - (view) Author: Mathias Fröjdman (mwf) Date: 2017-03-02 20:12
queue.Queue should have a close() method. The result of calling the method would be to raise a new exception - queue.Closed, for any subsequent calls to Queue.put, and after the queue is empty, also for Queue.get.

Why: To allow producers (callers of Queue.put) to signal there will be no more items, and consumers may stop asking for more by calling Queue.get. Currently the opposite (ie. waiting until all produced items/"tasks" have been consumed and handled) is possible with Queue.task_done() and Queue.join(). 

This functionality is useful in both application and library code. For example in AMQP, a server may push new messages over a TCP connection to a consumer, which translates into the library calling Queue.put for received messages, and the application using the library calling Queue.get to receive any new messages. The consumer may however be cancelled at any time, or the TCP connection closed and the Queue.get caller signaled that there will be no more messages. With Queue.close() that is easy - without it one needs to wrap the Queue.get calls).

In an application context where a KeyboardInterrupt should lead to closing the application cleanly, being able to call Queue.close(), catching the Closed exception in any consumers (some of which may be in other threads) and exiting cleanly makes the job that much easier.

A common pattern in working around this issue is to call Queue.put(None), and treat a None from Queue.get() as a signal to clean up. This works well when one knows there is at most one consumer. In the case of many consumers, one needs to wrap the Queue and for example add another None to the queue in consumers to not leave any remaining get() call waiting indefinitely. This pattern occurs even in the standard library: https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Lib/concurrent/futures/thread.py#L141

If accepting this proposal, a corresponding change should be made to asyncio.Queue.

I have a tentative implementation (no tests or doc outside the module) in https://github.com/mwfrojdman/cpython/blob/closeable_queue/Lib/queue.py

The Queue.close() method has an optional argument clear (default False), which clears the queue of items if set to true. This is useful for example when exiting an application, and one doesn't want consumers to get any more items before being raised a Closed exception. The changes are backwards compatible for users of the class, ie. if Queue.close() is not called, the behavior stays intact. Because of the clear argument, there is a new private method Queue._clear(), which does the actual clearing of the queue representation. Subclasses for which self.queue.clear() doesn't cut it, need to override it before .close(True) works.

Background: https://github.com/python/asyncio/pull/415#issuecomment-263658986
msg288852 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2017-03-03 04:04
I'll mull this one over for a while and take a look at what other languages are doing.   

My first impression is the close() is the wrong word because in other contexts it means that a resource has been freed and that no more calls are allowed.  In this context, the queue is still alive and continued get() calls will work until the queue is empty.

Another first reaction is that this will cause more problems than it solves (feature request to reopen a queue, wanting an API to detect whether a queue is closed, having to learn a new pattern for queue use, adding more complexity to the API, unexpected interactions with join/task_done).

The normal (or at least common) use cases for Queue involve leaving it open indefinitely with daemon threads always lying in wait for more data.  If there were a need to close down a consumer thread, the usual solution is to send a close-down signal/sentinel.  It would be more complicated for the multi-consumer case but that is atypical for Python threads (because the GIL limits getting any benefit).  The multi-consumer case is more likely to arise in an async environment.

These are only first impressions.  I will think about it more.  In the mean time, it would help if you could find a few real-world examples of existing code that would be improved by having a close() method, that would allow me to think more concretely about the subject.

FWIW, I don't normally think of queues as short-lived objects.  It is like having a email account (an atomic message queue) which never close off or shut down.
msg288970 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2017-03-04 07:11
Notes:
------

* Ruby has a Queue.close() method like the one being proposed here.  There is also a ?closed() call for testing the status:  https://ruby-doc.org/core-2.3.0/Queue.html

* Java queues do not have a closing method: https://docs.oracle.com/javase/8/docs/api/java/util/AbstractQueue.html

* This idea came up once for Python on StackOverflow.  The was little interest and the OP selected his own answer.  Concrete use cases were not discussed.  http://stackoverflow.com/questions/3605188/communicating-end-of-queue

* C# message queues have a close() method but it behaves much differently (it frees resources, doesn't raise exceptions, and will reacquire resources if used again).

* The Bull project NodeJS accessing Redis has a close() method for queues.  Its purpose is to perform a graceful shutdown of a remote service.  There doesn't seem to be a parallel using for Python threads (though it might make sense for async).  This use seems more akin to a connection.close() for databases. https://github.com/OptimalBits/bull#close

Open Questions
--------------

* Is try/except logic in a consumer preferable to the if-logic that would be used with a sentinel object.

* What the right vehicle for communicating out-of-band information like a requests for a consumer to shutdown-when-queue-empty, shut-down-immediately, restart, clear, pause-consumption, or reset?  Should there be a shared variable that consumers would monitor?  Should the communication be through a message in the queue itself?  Should there be a way to inject an exception much like a generator.throw()? 

* Do we have any concrete examples from real code that we could study to help inform the design decision?

* If there are concrete use cases, are they common or rare?  If it is rare, would it be better to use existing approaches (sending a messages through the queue, checking a shared variable after a timeout, having a second queue for control-flow information, etc).

* How would a producer indicate a reduced need for consumers (I no longer need ten workers, only two will suffice)?
msg288972 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2017-03-04 07:34
Another Note
------------

* The current Python multiprocessing module version of Queue has a close() method.  It isn't clear that use cases there are also applicable to plain queue module queues for threads.  https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.close
msg288985 - (view) Author: Davin Potts (davin) * (Python committer) Date: 2017-03-04 16:46
The example of AMQP is perhaps a stronger argument for why multiprocessing.Queue.close should (or does) exist, not as much a reason for queue.Queue.

The strongest point, I think, is the argument that existing patterns are lacking.

In the multiprocessing module, the pattern of placing None into a queue.Queue to communicate between threads is also used but with a slightly different use case:  a queue may have multiple None's added to it so that the queue's contents may be fully consumed and at the end the consumers understand to not look for more work when they each get a None.  It might be restated as "do your work, then close".  If close were introduced to queue.Queue as proposed, it would not eliminate the need for this pattern.

Thankfully inside multiprocessing the number of threads is known (for example, a thread to manage each process created by multiprocessing) making code possible such as:  `inqueue.queue.extend([None] * size)`.  In the more general case, the point that `size` is not always known is a valid one.  In this same vein, other parts of multiprocessing could potentially make use of queue.Queue.close but at least in multiprocessing's specific case I'm not sure I see a compelling simplification to warrant the change.  Though multiprocessing doesn't provide one, I think it would be helpful to see concrete use cases where there would be a clear benefit.
msg288999 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2017-03-05 03:38
Here's a link to the feature request for Ruby, https://bugs.ruby-lang.org/issues/10600 . It looks like the feature was implemented without examining any concrete use cases.  A quick code search on Github did not reveal any cases where the feature was ever used after it was added to Ruby.

For now, I think we should pass on this one.  Python's GIL limits what people typically do with threads (use cases focus on I/O bound tasks instead of CPU bound).  Consequently, it is atypical to have a multi-producer-multi-consumer use case, and it would be even more rare for that case to also need the queue to inform all the consumers to shutdown (pure daemons are the norm).   In addition, we already have work-arounds for those cases (shared variables, multiple poison-pills, secondary command queues, timeouts, etc).

The single consumer example in the standard library is already satisfactorily handled using None as a sentinel.  AFAICT, that code would not read any better with the new API.  The library itself is not typically a good indicator of end-user need (our needs tend to be more exotic).  The queue module has had a long life and this feature request hasn't arisen before, so that may indicate that this isn't a recurring or prevalent need.

It seems that what the OP really wants is for this feature to be implemented in asyncio.  I don't have much of an opinion on that one.  It seems to me that in a single-thread-single-process there isn't any benefit that would come from having multiple consumers, so I'm not sure why someone would need this feature (passing in None for a single consumer is already simple and doesn't require API expansion).

Without a recurring need, I think it is better not to do this one because the negatives would outweigh the positives.  It would open the door to a requests for is_closed(), for a closing context manager, and for a way to re-open a closed queue.  

This request would make more sense in a distributed or multi-processing environment (where multiple consumers have real benefits), but for plain threads, where it may be more distracting than helpful.
History
Date User Action Args
2017-03-05 03:38:13rhettingersetstatus: open -> closed
resolution: rejected
messages: + msg288999

stage: resolved
2017-03-04 16:46:05davinsetmessages: + msg288985
2017-03-04 07:34:42rhettingersetmessages: + msg288972
2017-03-04 07:11:57rhettingersetmessages: + msg288970
2017-03-03 04:04:18rhettingersetpriority: normal -> low
nosy: + tim.peters, davin
messages: + msg288852

2017-03-02 22:22:21rhettingersetassignee: rhettinger
2017-03-02 20:24:27serhiy.storchakasetnosy: + rhettinger
2017-03-02 20:12:12mwfcreate