Title: Add a cancel method to asyncio Queues
Type: enhancement Stage: patch review
Components: asyncio Versions: Python 3.8
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: Martin.Teichmann, asvetlov, cjrh, yselivanov
Priority: normal Keywords: patch

Created on 2019-06-19 07:41 by Martin.Teichmann, last changed 2019-06-24 13:13 by Martin.Teichmann.

Pull Requests
URL Status Linked Edit
PR 14227 open Martin.Teichmann, 2019-06-19 07:43
Messages (6)
msg346023 - (view) Author: Martin Teichmann (Martin.Teichmann) * Date: 2019-06-19 07:41
When working with queues, it is not uncommon that at some point the producer stops producing data for good, or the consumer stops consuming, for example because a network connection broke down or some user simply closed the session.

In this situation it is very useful to simply cancel all the waiting getters and putters. A simple method can do that, Queue.cancel.
msg346026 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-06-19 08:13
Sounds like `.close()` is better name for described behavior.
msg346113 - (view) Author: Martin Teichmann (Martin.Teichmann) * Date: 2019-06-20 11:09
I also thought about `.close()` but then found `.cancel()` more intuitive. But intuition is not universal, so I am open to any wording.
msg346268 - (view) Author: Caleb Hattingh (cjrh) * Date: 2019-06-22 03:00
I'm interested in how this change would affect the pattern of shutting down a queue-processing task.

How would one decide between whether to cancel the queue or the task? (I'm asking for real, this is not an objection to the PR). For example, looking at the two tests in the PR:

    def test_cancel_get(self):
        queue = asyncio.Queue(loop=self.loop)

        getter = self.loop.create_task(queue.get())
        queue.cancel()                       # <---- HERE
        with self.assertRaises(asyncio.CancelledError):

This test would work exactly the same if the `getter` task was cancelled instead right?  Like this:

    def test_cancel_get(self):
        queue = asyncio.Queue(loop=self.loop)

        getter = self.loop.create_task(queue.get())
        getter.cancel()                       # <---- HERE
        with self.assertRaises(asyncio.CancelledError):

So my initial reaction is that I'm not sure under what conditions it would be more useful to cancel the queue instead of the task. I am very used to applying cancellation to tasks rather than the queues they contain, so I might lack imagination in this area. The idiom I've been using so far for consuming queues looks roughly something like this:

async def consumer(q: asyncio.Queue):
    while True:
            data = await q.get()
        except asyncio.CancelledError:
            q.put_nowait(None) # ignore QueueFull for this discussion

            if not data:
      'Queue shut down cleanly')
                return     # <------ The only way to leave the coro
            <process data>
        except Exception:
            logging.exception('Unexpected exception:')

^^ With this pattern, I can shut down the `consumer` task either by cancelling the task (internally it'll put a `None` on the queue) or by placing a `None` on the queue outright from anywhere else. The key point is that in either case, existing items on the queue will still get processed before the `None` is consumed, terminating the task from the inside.

(A) If the queue itself is cancelled (as in the proposed PR), would it still be possible to catch the `CancelledError` and continue processing whatever items have already been placed onto the queue? (and in this case, I think I'd still need to place a sentinel onto the queue to mark the "end" that correct?)

(B) The `task_done()` is important for app shutdown so that the application shutdown process waits for all currently-pending queue items to be processed before proceeding with the next shutdown step. So, if the queue itself is cancelled (as in the proposed PR), what happens to the application-level call to `await queue.join()` during the shutdown sequence, if a queue was cancelled while there were still unprocessed items on the queue for which `task_done()` had not been called?

It would be great to have an example of how the proposed `queue.cancel()` would be used idiomatically, w.r.t. the two questions above.  It might be intended that the idiomatic usage of `queue.cancel()` is for situations where one doesn't care about dropping items previously placed on the queue. Is that the case?
msg346276 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-06-22 12:32
Agree with Caleb.

The more I think the more I doubt about the proposal.
Cancellation is for tasks, not for queues or locks.

When should I cancel a queue but cannot cancel a task?
msg346391 - (view) Author: Martin Teichmann (Martin.Teichmann) * Date: 2019-06-24 13:13
Given the reactions I gather "close" is a better name for the method, so I changed it accordingly.

In the current implementation, items that had been put on the queue but not processed yet still get processed after the close, and I think this is the desired behavior. I added a test such that this won't unexpectedly change in the future.

To be precise, the current implementation of the queue does not even put new items on the queue if there is already a waiting consumer. The item will directly be handed over to said consumer, which may hang around on the event loop for a bit longer, but during this time the item is not in the queue. This also answers the questions about catching the CancelledError: if there are waiting consumers, there is nothing on the queue, so the problem of processing leftover items does not exist. The same holds for the task_done.

As for the "why don't I just cancel the task?", well, if you know it. There may be many consumer or producer tasks waiting for their turn. Sure, you can keep a list of all those tasks. But that's exactly the point of the proposed change: the Queue already knows all the waiting tasks, no need to keep another list up-to-date!
Date User Action Args
2019-06-24 13:13:58Martin.Teichmannsetmessages: + msg346391
2019-06-22 12:32:07asvetlovsetmessages: + msg346276
2019-06-22 03:00:41cjrhsetnosy: + cjrh
messages: + msg346268
2019-06-20 11:09:37Martin.Teichmannsetmessages: + msg346113
2019-06-19 08:13:09asvetlovsetmessages: + msg346026
2019-06-19 07:43:15Martin.Teichmannsetkeywords: + patch
stage: patch review
pull_requests: + pull_request14061
2019-06-19 07:41:48Martin.Teichmanncreate