This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: asyncio.Queue: putting items out of order when it is full
Type: Stage:
Components: asyncio Versions: Python 3.7
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: asvetlov, esrse, josh.r, yselivanov
Priority: normal Keywords:

Created on 2019-11-21 03:25 by esrse, last changed 2022-04-11 14:59 by admin.

Files
File name Uploaded Description Edit
poc-oooput.py esrse, 2019-11-22 06:43
Messages (6)
msg357129 - (view) Author: Junyeong Jeong (esrse) Date: 2019-11-21 03:25
Hi

The python document says that asyncio.Queue is FIFO queue. But it is not true
when the queue is full and multi tasks are putting items into it
simultaneously. I know the document does not explicitly mention that
asyncio.Queue is multi producer queue, but also there does not exist any note
that it may be corrupt if used by multi producers.

I found the below scenario happens when the asyncio.Queue is full and multi
tasks are putting into it simultaneously. Let me explain this.

When a queue is full, putters are awaiting here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L125

And the first of them is waken up here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L188

Before the first putter hands over the control of execution, new putter
calls the .put() method so the queue is not full at this point that it calls
.put_nowait() immediately here: https://github.com/python/cpython/blob/d67279147ace3b63187e5d75a15c345264f39e85/Lib/asyncio/queues.py#L140

If this happens the new putter puts earlier than other awaiting putters.

I hope the queue gets fixed to work correctly in this case.

Thanks,

  Junyeong
msg357130 - (view) Author: Josh Rosenberg (josh.r) * (Python triager) Date: 2019-11-21 03:36
The items that haven't finished the put aren't actually "in" the queue yet, so I don't see how non-FIFO order of insertion violates any FIFO guarantees for the contents of the queue; until the items are actually "in", they're not sequenced for the purposes of when they come "out". Mandating such a guarantee effectively means orchestrating a queue with a real maxsize equal to the configured maxsize plus the total number of coroutines competing to put items into it.

The guarantee is still being met here; once an item is put, it will be "get"-ed after anything that finished put-ing before it, and before anything that finished put-ing after it.
msg357132 - (view) Author: Junyeong Jeong (esrse) Date: 2019-11-21 05:59
> The items that haven't finished the put aren't actually "in" the queue yet, so I don't see how non-FIFO order of insertion violates any FIFO guarantees for the contents of the queue; until the items are actually "in", they're not sequenced for the purposes of when they come "out". Mandating such a guarantee effectively means orchestrating a queue with a real maxsize equal to the configured maxsize plus the total number of coroutines competing to put items into it.

Your explanation is correct technically. But without enough explanation, this
behavior can confuse many users.

For instance it happens to put data into asyncio.Queue at asyncio protocol
callbacks by creating new task. This logic will work nicely for a long time but
in the corner case the queue gets full and at the point of view of a consumer
the data come out from the queue are out of order.

IMHO, it would be better to add some note into the document, describing what
FIFO means in regard to asyncio.Queue.
msg357166 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-11-21 13:00
I still don't understand the problem.
If the queue is full new items still are added in the order of `await q.put()` calls.
If there are multiple producers the order of adding items into the queue is still the order of `q.put()`.

Do you have a code example that demonstrates the opposite behavior?
msg357247 - (view) Author: Junyeong Jeong (esrse) Date: 2019-11-22 06:43
Thanks for having an interest in this issue.

I thought asyncio.Queue guarantees the order of items as .put called after I
read the code. But it does not.

I attach poc code here.
In my machine, this program prints this message and exits.
$ python poc-oooput.py
num=1, last_read_num=0
num=33, last_read_num=1
Traceback (most recent call last):
  File "poc-oooput.py", line 47, in <module>
    asyncio.run(main())
  File "/home/esrse/.pyenv/versions/3.7.3/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/home/esrse/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "poc-oooput.py", line 44, in main
    await stream.read()
  File "poc-oooput.py", line 20, in read
    assert num == self._last_read_num + 1
AssertionError

This shows you that the 33rd item is put before the 2nd item.
msg357553 - (view) Author: Josh Rosenberg (josh.r) * (Python triager) Date: 2019-11-27 03:57
Yes, five outstanding blocked puts can be bypassed by a put that comes in immediately after a get creates space. But this isn't really a problem; there are no guarantees on what order puts are executed in, only a guarantee that once a put succeeds, it's FIFO ordered with respect to all other puts.

Nothing in the docs even implies the behavior you're expecting, so I'm not seeing how even a documentation fix is warranted here. The docs on put clearly say:

"Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item."

If we forcibly hand off on put even when a slot is available (to allow older puts to finish first), then we violate the expectation that waiting is only performed when the queue is full (if I test myqueue.full() and it returns False, I can reasonably expect that put won't block). This would be especially impossible to fix if people write code like `if not myqueue.full(): myqueue.put_nowait()`. put_nowait isn't even a coroutine, so it *can't* hand off control to the event loop to allow waiting puts to complete, even if it wanted to, and it can't fail to put (e.g. by determining the empty slots will be filled by outstanding puts in some relatively expensive way), because you literally *just* verified the queue wasn't full and had no awaits between the test and the put_nowait, so it *must* succeed.

In short: Yes, it's somewhat unpleasant that a queue slot can become free and someone else can swoop in and steal it before older waiting puts can finish. But any change that "fixed" that would make all code slower (forcing unnecessary coroutine switches), and violate existing documentation guarantees.
History
Date User Action Args
2022-04-11 14:59:23adminsetgithub: 83055
2019-11-27 03:57:51josh.rsetmessages: + msg357553
2019-11-22 06:43:48esrsesetfiles: + poc-oooput.py

messages: + msg357247
2019-11-21 13:00:10asvetlovsetmessages: + msg357166
2019-11-21 05:59:06esrsesetmessages: + msg357132
2019-11-21 03:36:30josh.rsetnosy: + josh.r
messages: + msg357130
2019-11-21 03:25:54esrsecreate