This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: asyncio.Queue.put never yields if the queue is unbounded
Type: behavior Stage: resolved
Components: asyncio Versions: Python 3.10, Python 3.9, Python 3.8
process
Status: closed Resolution: wont fix
Dependencies: Superseder:
Assigned To: Nosy List: asvetlov, josh.r, kj, spenczar, yselivanov
Priority: normal Keywords: patch

Created on 2021-02-03 19:41 by spenczar, last changed 2022-04-11 14:59 by admin. This issue is now closed.

Files
File name Uploaded Description Edit
never_yields.py spenczar, 2021-02-03 19:41 minimal executable reproducer of unyielding behavior
Pull Requests
URL Status Linked Edit
PR 24433 closed spenczar, 2021-02-03 20:41
Messages (6)
msg386454 - (view) Author: Spencer Nelson (spenczar) * Date: 2021-02-03 19:41
I am writing some software that reads records from a very large file (~hundreds of GB), putting them in an `asyncio.Queue` as it goes, and a chain of consumers handle each record and do stuff over the network.

To my surprise, my program runs out of memory because the Queue producer coroutine never yields control. I think (but am not sure) that the asyncio.Queue.put method has no preemption point if the queue is not full; I was using an unbounded Queue, so it was _never_ full, so my coroutine was never unscheduled.

I have attached a file with a minimal reproducer. It creates an unbounded queue. A 'publish' task calls `queue.put` from an infinite sequence. A 'subscribe' task calls `queue.get`, and prints each item. Nothing gets printed if I run this, because the `queue.put` never blocks.

I would expect that `await queue.put` would occasionally cede execution to any other runnable coroutines, even if the queue is unbounded.
msg386477 - (view) Author: Ken Jin (kj) * (Python committer) Date: 2021-02-04 09:11
Thanks for the minimal reproducer. I've tested it on 3.9.0 and 3.10a4 and they seem to exhibit the same behavior too.

Out of genuine curiosity (I don't mean to question if this *is* a bug, it seems like a trap for users): why not place an ``await asyncio.sleep(0)`` after ``queue.put`` line to force a switch in the producer? Eg, from your example code, instead of :
        while True:
            await q.put(i)

maybe this:
        while True:
            await q.put(i)
            await asyncio.sleep(0)
With that workaround, your example starts printing each item and the consumer tasks don't seem to get blocked.
msg386488 - (view) Author: Spencer Nelson (spenczar) * Date: 2021-02-04 17:49
Thanks for testing on more Python versions.

Yes, adding asyncio.sleep(0) after each put is an effective workaround - it's certainly possible to manually yield like that. I just think that that's what asyncio.Queue.put ought to be doing - in fact, that's my proposed change in the associated PR: https://github.com/python/cpython/pull/24433/files#diff-22a6bcb03e783378149a3e9411c185b13c908e61886ffd55145634b7ab12caacR141

I suppose that, more broadly, I would have expected that _any_ `await f()` call would be a preemptible point, but I think that's a much, much larger discussion.
msg386512 - (view) Author: Josh Rosenberg (josh.r) * (Python triager) Date: 2021-02-05 04:46
Making literally every await equivalent to:

await asyncio.sleep(0)

followed by the actual await (which is effectively what you're proposing when you expect all await to be preemptible) means adding non-trivial overhead to all async operations (asyncio is based on system calls of the select/poll/epoll/kpoll variety, which add meaningful overhead when we're talking about an operation that is otherwise equivalent to an extremely cheap simple collections.deque.append call). It also breaks many reasonable uses of asyncio.wait and asyncio.as_completed, where the caller can reasonably expect to be able to await the known-complete tasks without being preempted (if you know the coroutine is actually done, it could be quite surprising/problematic when you await it and get preempted, potentially requiring synchronization that wouldn't be necessary otherwise).

Making all await yield to the event loop would be like releasing the GIL before acquiring an uncontended lock; it makes an extremely cheap operation *much* higher overhead to, at best, fix a problem with poorly designed code. In real life, if whatever you're feeding the queue with is infinite and requires no awaiting to produce each value, you should probably just avoid the queue and have the consumer consume the iterable directly. Or just apply a maximum size to the queue; since the source of data to put is infinite and not-awaitable, there's no benefit to an unbounded queue, you may as well use a bound roughly fitted to the number of consumers, because any further items are just wasting memory well ahead of when it's needed.

Point is, regular queue puts only block (and potentially release the GIL early) when they're full or, as a necessary consequence of threading being less predictable than asyncio, when there is contention on the lock protecting the queue internals (which is usually resolved quickly); why would asyncio queues go out of their way to block when they don't need to?
msg386539 - (view) Author: Spencer Nelson (spenczar) * Date: 2021-02-05 19:52
Josh,

> Making literally every await equivalent to:
> 
> await asyncio.sleep(0)
> 
> followed by the actual await (which is effectively what you're proposing when you expect all await to be preemptible) means adding non-trivial overhead to all async operations (asyncio is based on system calls of the select/poll/epoll/kpoll variety, which add meaningful overhead when we're talking about an operation that is otherwise equivalent to an extremely cheap simple collections.deque.append call).

A few things:

First, I don't think I proposed that. I was simply saying that my expectations on behavior were incorrect, which points towards documentation.

Second, I don't think making a point "preemptible" is the same as actually executing a cooperative-style yield to the scheduler. I just expected that it would always be in the cards - that it would always be a potential point where I'd get scheduled away.

Third, I don't think that await asyncio.sleep(0) triggers a syscall, but I certainly could be mistaken. It looks to me like it is special-cased in asyncio, from my reading of the source. Again - could be wrong.

Fourth, I think that the idea of non-cooperative preempting scheduling is not nearly as bizarre as you make it sound. There's certainly plenty of prior art on preemptive schedulers out there. Go uses a sort of partial preemption at function call sites *because* it's a particularly efficient way to do things.

But anyway - I didn't really want to discuss this. As I said above, it's obviously a way way way bigger design discussion than my specific issue.


> It also breaks many reasonable uses of asyncio.wait and asyncio.as_completed, where the caller can reasonably expect to be able to await the known-complete tasks without being preempted (if you know the coroutine is actually done, it could be quite surprising/problematic when you await it and get preempted, potentially requiring synchronization that wouldn't be necessary otherwise).

I think this cuts both ways. Without reading the source code of asyncio.Queue, I don't see how it's possible to know whether its put method yields. Because of this, I tend to assume synchronization is necessary everywhere. The way I know for sure that a function call can complete without yielding is supposed to be that it isn't an `async` function, right? That's why asyncio.Queue.put_nowait exists and isn't asynchronous.

> In real life, if whatever you're feeding the queue with is infinite and requires no awaiting to produce each value, you should probably just avoid the queue and have the consumer consume the iterable directly.

The stuff I'm feeding the queue doesn't require awaiting, but I *wish* it did. It's just a case of not having the libraries for asynchronicity yet on the source side. I was hoping that the queue would let me pace my work in a way that would let me do more concurrent work.

> Or just apply a maximum size to the queue; since the source of data to put is infinite and not-awaitable, there's no benefit to an unbounded queue, you may as well use a bound roughly fitted to the number of consumers, because any further items are just wasting memory well ahead of when it's needed.

The problem isn't really that put doesn't yield for unbounded queues - it's that put doesn't yield *unless the queue is full*. That means that, if I use a very high maximum size for the queue, I'll still spend a big chunk of time filling up the queue, and only then will consumers start doing work.

I could pick a small queue bound, but then I'm more likely to waste time doing nothing if consumers are slower than the producer - I'll sit there with a full-but-tiny queue. Work-units in the queue can take wildly different amounts of time, so consumers will often be briefly slow, so the producer races ahead - until it hits its tiny limit. But then new work units arrive, and so the consumers are fast again - and they're quickly starved for work because the producer didn't build a good backlog.

So, the problem still remains, if work takes an uncertain amount of time which would seem to be the common reason for using a queue in the first place.

> Point is, regular queue puts only block (and potentially release the GIL early) when they're full or, as a necessary consequence of threading being less predictable than asyncio, when there is contention on the lock protecting the queue internals (which is usually resolved quickly); why would asyncio queues go out of their way to block when they don't need to?

I think you have it backwards. asyncio.Queue.put *always* blocks other coroutines' execution for unbounded queues. Why do they always block? If I wanted that, I wouldn't use anything in asyncio.Queue. I'd just use a collections.deque.
msg413756 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2022-02-22 23:07
Sorry, that's how asyncio works: it never switches to another task if `await ...` doesn't need to wait for something actually.

Adding `await asyncio.sleep(0)` to every call decreases performance.
History
Date User Action Args
2022-04-11 14:59:41adminsetgithub: 87285
2022-02-22 23:07:28asvetlovsetstatus: open -> closed
resolution: wont fix
messages: + msg413756

stage: patch review -> resolved
2021-02-05 19:52:14spenczarsetmessages: + msg386539
2021-02-05 04:46:47josh.rsetnosy: + josh.r
messages: + msg386512
2021-02-04 17:49:27spenczarsetmessages: + msg386488
2021-02-04 09:11:58kjsetnosy: + kj

messages: + msg386477
versions: + Python 3.9, Python 3.10
2021-02-03 20:41:38spenczarsetkeywords: + patch
stage: patch review
pull_requests: + pull_request23243
2021-02-03 19:41:22spenczarcreate