classification
Title: Allow limiting the number of concurrent tasks in asyncio.as_completed
Type: enhancement Stage:
Components: asyncio Versions: Python 3.8
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: andybalaam, aparamon, asvetlov, glin, lisroach, twisteroid ambassador, yselivanov
Priority: normal Keywords:

Created on 2017-06-27 01:05 by andybalaam, last changed 2019-09-09 14:54 by yselivanov.

Pull Requests
URL Status Linked Edit
PR 2424 closed andybalaam, 2017-06-27 01:10
Messages (17)
msg296982 - (view) Author: Andy Balaam (andybalaam) * Date: 2017-06-27 01:05
asyncio.as_completed allows us to provide lots of coroutines (or Futures) to schedule, and then deal with the results as soon as they are available, in a loop, or a streaming style.

I propose to allow as_completed to work on very large numbers of coroutines, provided through a generator (rather than a list).  In order to make this practical, we need to limit the number of coroutines that are scheduled simultaneously to a reasonable number.

For tasks that open files or sockets, a reasonable number might be 1000 or fewer.  For other tasks, a much larger number might be reasonable, but we would still like some limit to prevent us running out of memory.

I suggest adding a "limit" argument to as_completed that limits the number of coroutines that it schedules simultaneously.

For me, the key advantage of as_completed (in the proposed modified form) is that it enables a streaming style that looks quite like synchronous code, but is efficient in terms of memory usage (as you'd expect from a streaming style):


#!/usr/bin/env python3

import asyncio
import sys

limit = int(sys.argv[1])

async def double(x):
    await asyncio.sleep(1)
    return x * 2

async def print_doubles():
    coros = (double(x) for x in range(1000000))
    for res in asyncio.as_completed(coros, limit=limit):
        r = await res
        if r % 100000 == 0:
            print(r)

loop = asyncio.get_event_loop()
loop.run_until_complete(print_doubles())
loop.close()


Using my prototype implementation, this runs faster and uses much less memory on my machine when you run it with a limit of 100K instead of 1 million concurrent tasks:

$ /usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" ./example 1000000
Memory usage: 2234552KB	Time: 97.52 seconds

$ /usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" ./example 100000
Memory usage: 252732KB	Time: 94.13 seconds

I have been working on an implementation and there is some discussion in my blog posts: http://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp/ and http://www.artificialworlds.net/blog/2017/06/27/adding-a-concurrency-limit-to-pythons-asyncio-as_completed/

Possibly the most controversial thing about this proposal is the fact that we need to allow passing a generator to as_completed instead of enforcing that it be a list.  This is fundamental to allowing the style I outlined above, but it's possible that we can do better than the blanket allowing of all generators that I did.
msg300401 - (view) Author: Andy Balaam (andybalaam) * Date: 2017-08-17 07:57
bump
msg308776 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2017-12-20 19:07
as_completed() is low-level API.
Let's not overload it with different parameters.

Anyway `as_completed()` uses only asyncio.Future and it's public API like `add_done_callback()` etc.

You can master everything what you need without asyncio modification.

Let's close the issue with "wont fix" resolution.
msg308779 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-20 19:11
I agree, let's keep as_completed() simple for now.  Handling generators+async correctly is hard, so we definitely don't have time for this in 3.7.
msg309694 - (view) Author: Andy Balaam (andybalaam) * Date: 2018-01-09 09:58
I would argue that this makes as_completed a lot more useful than it is now, so would be worth adding (maybe after 3.7).

But, if this does not go into asyncio, is there another library where it would belong?  Or should this be a completely new library?
msg309794 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2018-01-11 08:14
Third parties are not python core devs responsibility.

I don't aware about existing library with such functionality.
msg335090 - (view) Author: (glin) Date: 2019-02-08 15:51
@Andrew Svetlov: I was surprised when you wrote "as_completed() is low-level API.", so I wondered what is high level API and I googled it and first that came was official doc:

https://docs.python.org/3/library/asyncio-api-index.html

Where it's written that it's high level API indeed.

IMHO without limit, all of these functions are quite impractical (as_completed, wait, gather, ...). I have simple to moderately complex scripts and I'm running into problems with it (API servers limiting number of requests per minute, /tmp/ (4GB ramdisk) out of space, memory issues...

Please reconsider adding limit to these functions as it's suppose to be high level API, not low level.

Thanks
msg336192 - (view) Author: Andrey Paramonov (aparamon) Date: 2019-02-21 09:00
Might as_completed() be considered a low-level API, but as of Python 3.7 there are seemingly no ready alternatives to achieve proposed behavior. All of asyncio.gather(), asyncio.wait(), asyncio.as_completed() expect awaitables list of limited size; doing something like https://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp is not straightforward.

A function that takes iterator/async iterator of tasks and is itself generator/async generator is very much wanted, something in the spirit of (but more efficient?)
----
async def igather(tasks, limit=None):
    pending = set()
    while True:
        for task in islice(tasks, limit - len(pending) if limit else None):
            pending.add(task)
        if pending:
            done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
            for task in done:
                yield task
        else:
            break
----

It is an open question whether such function should yield results in the task submission order. Albeit useful, it's a bit harder to implement and (most importantly) has terrible worst-case memory behavior.

See also:
https://bugs.python.org/issue33533
https://github.com/h2non/paco/issues/38
msg336237 - (view) Author: twisteroid ambassador (twisteroid ambassador) * Date: 2019-02-21 16:53
I feel like once you lay out all the requirements: taking futures from an (async) generator, limiting the number of concurrent tasks, getting completed tasks to one consumer "as completed", and an implicit requirement that back pressure from the consumer should be handled (i.e. if whoever's iterating through "async for fut in as_completed(...)" is too slow, then the tasks should pause until it catches up), there are too many moving parts, and this should really be implemented using several tasks.

So a straightforward implementation may look like this:

    async def better_as_completed(futs, limit):
        MAX_DONE_FUTS_HELD = 10  # or some small number
        
        sem = asyncio.Semaphore(limit)
        done_q = asyncio.Queue(MAX_DONE_FUTS_HELD)
        
        async def run_futs():
            async for fut in futs:
                await sem.acquire()
                asyncio.create_task(run_one_fut(fut))
            
            async with sem:
                await done_q.put(None)
        
        async def run_one_fut(fut):
            try:
                fut = asyncio.ensure_future(fut)
                await asyncio.wait((fut,))
                await done_q.put(fut)
            finally:
                sem.release()
        
        asyncio.create_task(run_futs())
        
        while True:
            next_fut = await done_q.get()
            if next_fut is None:
                return
            yield next_fut


Add proper handling for cancellation and exceptions and whatnot, and it may become a usable implementation.

And no, I do not feel like this should be added to asyncio.as_completed.
msg336526 - (view) Author: Andrey Paramonov (aparamon) Date: 2019-02-25 14:55
> an implicit requirement that back pressure from the consumer should be handled (i.e. if whoever's iterating through "async for fut in as_completed(...)" is too slow, then the tasks should pause until it catches up)

No, I don't think it is required or desired to be handled.

My initial sketch was imprecise: it's better to asynchronously yield task results, not "completed task" futures. This way, no additional buffer needed, all error handling can be consolidated inside `igather()`, and that's actually more compatible with `asyncio.gather()`.

I.e, instead of
----
yield next_fut
----
use
----
yield await next_fut
----
msg340318 - (view) Author: Andrey Paramonov (aparamon) Date: 2019-04-16 07:34
Hello!

Below is updated implementation containing more consistent error handling.
The main rough edges encountered:

1. asyncio.Queue alone proved insufficient for precise control of limit, as asyncio.create_task() schedules created Task() immediately and it may start executing before being added to queue (thus leading to limit+1 tasks running). Additional semaphore is used to tackle that.

2. When exception, other running tasks have to be cancel()ed and then await'ed to ensure all tasks are successfully finished by the time igather exits. Just cancel()ing proved not sufficient.

3. When exception, unscheduled coroutines have to be wrapped with asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never awaited". But maybe there is a more elegant way to suppress this warning for a coroutine?

In my client code I didn't so far encounter "an implicit requirement that back pressure from the consumer should be handled", but it should be possible to implement separately and quite straightforwardly, with the help of asyncio.Queue.

    async def igather(coros, limit=None):
        coros = iter(coros)

        buf = asyncio.Queue()
        sem = asyncio.Semaphore(limit or math.inf)

        async def submit(coros, buf):
            while True:
                await sem.acquire()
                try:
                    # TODO: additionally support async iterators
                    coro = next(coros)
                except StopIteration:
                    break
                task = asyncio.create_task(coro)
                buf.put_nowait(task)
            await buf.put(None)

        async def consume(buf):
            while True:
                task = await buf.get()
                if task:
                    v = await asyncio.wait_for(task, None)
                    sem.release()
                    yield v
                else:
                    break

        submit_task = asyncio.create_task(submit(coros, buf))
        try:
            async for result in consume(buf):
                yield result
        except:
            submit_task.cancel()
            # cancel scheduled
            while not buf.empty():
                task = buf.get_nowait()
                if task:
                    task.cancel()
                    try:
                        await task
                    except:
                        pass
            # cancel pending
            for coro in coros:
                asyncio.create_task(coro).cancel()
            raise

Shall I go ahead and prepare a PR with docs and tests?
msg351481 - (view) Author: Lisa Roach (lisroach) * (Python committer) Date: 2019-09-09 14:35
I would like to see this implemented. I run into memory and speed issues when running with 1000+ tasks quite frequently, and have to write my own rate limiters around it.

It doesn't look to me that it is adding a large amount of complexity to as_completed. This was closed in 2017, perhaps it is worth revisiting?
msg351482 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-09-09 14:41
We want to add TaskGroups to asyncio (a similar concept to Trio's nurseries).  TaskGroups use the `async with` statement to clearly define where Tasks are created and at which point they are expected to be completed or destroyed.

asyncio.gather(), asyncio.as_completed(), and few others will be considered legacy APIs after we implement TaskGroups.  Implementing rate limiting on top of TaskGroups is easier and more reliable.

I'd really prefer to keep as_completed() and especially gather() as is, as I consider them a bit broken already.
msg351484 - (view) Author: Lisa Roach (lisroach) * (Python committer) Date: 2019-09-09 14:43
Sounds good, thanks for the explanation Yury. I look forward to the TaskGroups!
msg351486 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-09-09 14:46
FWIW I've been using TaskGroups in EdgeDB codebase extensively: https://github.com/edgedb/edgedb/blob/master/edb/common/taskgroup.py (you can use the code, it's Apache 2)

The only thing that prevented us from merging them in 3.8 is that we need to formally define & implement ExceptionGroup (or MultiError) in CPython.  I'm going to work on an initial PEP for that this week.
msg351490 - (view) Author: Lisa Roach (lisroach) * (Python committer) Date: 2019-09-09 14:53
Oh nice, I remember talking to you about the MultiError before, it will
help simplify some Mock things. Happy to help out if you want more eyes on
it.

On Mon, Sep 9, 2019 at 3:46 PM Yury Selivanov <report@bugs.python.org>
wrote:

>
> Yury Selivanov <yselivanov@gmail.com> added the comment:
>
> FWIW I've been using TaskGroups in EdgeDB codebase extensively:
> https://github.com/edgedb/edgedb/blob/master/edb/common/taskgroup.py (you
> can use the code, it's Apache 2)
>
> The only thing that prevented us from merging them in 3.8 is that we need
> to formally define & implement ExceptionGroup (or MultiError) in CPython.
> I'm going to work on an initial PEP for that this week.
>
> ----------
>
> _______________________________________
> Python tracker <report@bugs.python.org>
> <https://bugs.python.org/issue30782>
> _______________________________________
>
msg351491 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2019-09-09 14:54
> it will help simplify some Mock things.

Yeah, we'll need to chat about that so I can use Mock requirements in the PEP. :)
History
Date User Action Args
2019-09-09 14:54:30yselivanovsetmessages: + msg351491
2019-09-09 14:53:07lisroachsetmessages: + msg351490
2019-09-09 14:46:09yselivanovsetmessages: + msg351486
2019-09-09 14:43:53lisroachsetmessages: + msg351484
2019-09-09 14:41:34yselivanovsetmessages: + msg351482
2019-09-09 14:35:21lisroachsetnosy: + lisroach
messages: + msg351481
2019-04-16 07:34:20aparamonsetmessages: + msg340318
2019-02-25 14:55:58aparamonsetmessages: + msg336526
2019-02-21 16:53:02twisteroid ambassadorsetnosy: + twisteroid ambassador
messages: + msg336237
2019-02-21 09:00:15aparamonsetnosy: + aparamon
messages: + msg336192
2019-02-08 15:51:43glinsetnosy: + glin
messages: + msg335090
2018-01-11 08:14:43asvetlovsetmessages: + msg309794
2018-01-09 09:58:59andybalaamsetmessages: + msg309694
2017-12-20 19:11:32yselivanovsetmessages: + msg308779
versions: + Python 3.8, - Python 3.7
2017-12-20 19:07:29asvetlovsetnosy: + asvetlov
messages: + msg308776
2017-08-17 07:57:44andybalaamsetmessages: + msg300401
2017-06-27 01:10:51andybalaamsetpull_requests: + pull_request2475
2017-06-27 01:05:22andybalaamcreate