Issue30782
This issue tracker has been migrated to GitHub,
and is currently read-only.
For more information,
see the GitHub FAQs in the Python's Developer Guide.
Created on 2017-06-27 01:05 by andybalaam, last changed 2022-04-11 14:58 by admin.
Pull Requests | |||
---|---|---|---|
URL | Status | Linked | Edit |
PR 2424 | closed | andybalaam, 2017-06-27 01:10 |
Messages (17) | |||
---|---|---|---|
msg296982 - (view) | Author: Andy Balaam (andybalaam) * | Date: 2017-06-27 01:05 | |
asyncio.as_completed allows us to provide lots of coroutines (or Futures) to schedule, and then deal with the results as soon as they are available, in a loop, or a streaming style. I propose to allow as_completed to work on very large numbers of coroutines, provided through a generator (rather than a list). In order to make this practical, we need to limit the number of coroutines that are scheduled simultaneously to a reasonable number. For tasks that open files or sockets, a reasonable number might be 1000 or fewer. For other tasks, a much larger number might be reasonable, but we would still like some limit to prevent us running out of memory. I suggest adding a "limit" argument to as_completed that limits the number of coroutines that it schedules simultaneously. For me, the key advantage of as_completed (in the proposed modified form) is that it enables a streaming style that looks quite like synchronous code, but is efficient in terms of memory usage (as you'd expect from a streaming style): #!/usr/bin/env python3 import asyncio import sys limit = int(sys.argv[1]) async def double(x): await asyncio.sleep(1) return x * 2 async def print_doubles(): coros = (double(x) for x in range(1000000)) for res in asyncio.as_completed(coros, limit=limit): r = await res if r % 100000 == 0: print(r) loop = asyncio.get_event_loop() loop.run_until_complete(print_doubles()) loop.close() Using my prototype implementation, this runs faster and uses much less memory on my machine when you run it with a limit of 100K instead of 1 million concurrent tasks: $ /usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" ./example 1000000 Memory usage: 2234552KB Time: 97.52 seconds $ /usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" ./example 100000 Memory usage: 252732KB Time: 94.13 seconds I have been working on an implementation and there is some discussion in my blog posts: http://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp/ and http://www.artificialworlds.net/blog/2017/06/27/adding-a-concurrency-limit-to-pythons-asyncio-as_completed/ Possibly the most controversial thing about this proposal is the fact that we need to allow passing a generator to as_completed instead of enforcing that it be a list. This is fundamental to allowing the style I outlined above, but it's possible that we can do better than the blanket allowing of all generators that I did. |
|||
msg300401 - (view) | Author: Andy Balaam (andybalaam) * | Date: 2017-08-17 07:57 | |
bump |
|||
msg308776 - (view) | Author: Andrew Svetlov (asvetlov) * | Date: 2017-12-20 19:07 | |
as_completed() is low-level API. Let's not overload it with different parameters. Anyway `as_completed()` uses only asyncio.Future and it's public API like `add_done_callback()` etc. You can master everything what you need without asyncio modification. Let's close the issue with "wont fix" resolution. |
|||
msg308779 - (view) | Author: Yury Selivanov (yselivanov) * | Date: 2017-12-20 19:11 | |
I agree, let's keep as_completed() simple for now. Handling generators+async correctly is hard, so we definitely don't have time for this in 3.7. |
|||
msg309694 - (view) | Author: Andy Balaam (andybalaam) * | Date: 2018-01-09 09:58 | |
I would argue that this makes as_completed a lot more useful than it is now, so would be worth adding (maybe after 3.7). But, if this does not go into asyncio, is there another library where it would belong? Or should this be a completely new library? |
|||
msg309794 - (view) | Author: Andrew Svetlov (asvetlov) * | Date: 2018-01-11 08:14 | |
Third parties are not python core devs responsibility. I don't aware about existing library with such functionality. |
|||
msg335090 - (view) | Author: (glin) | Date: 2019-02-08 15:51 | |
@Andrew Svetlov: I was surprised when you wrote "as_completed() is low-level API.", so I wondered what is high level API and I googled it and first that came was official doc: https://docs.python.org/3/library/asyncio-api-index.html Where it's written that it's high level API indeed. IMHO without limit, all of these functions are quite impractical (as_completed, wait, gather, ...). I have simple to moderately complex scripts and I'm running into problems with it (API servers limiting number of requests per minute, /tmp/ (4GB ramdisk) out of space, memory issues... Please reconsider adding limit to these functions as it's suppose to be high level API, not low level. Thanks |
|||
msg336192 - (view) | Author: Andrey Paramonov (aparamon) | Date: 2019-02-21 09:00 | |
Might as_completed() be considered a low-level API, but as of Python 3.7 there are seemingly no ready alternatives to achieve proposed behavior. All of asyncio.gather(), asyncio.wait(), asyncio.as_completed() expect awaitables list of limited size; doing something like https://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp is not straightforward. A function that takes iterator/async iterator of tasks and is itself generator/async generator is very much wanted, something in the spirit of (but more efficient?) ---- async def igather(tasks, limit=None): pending = set() while True: for task in islice(tasks, limit - len(pending) if limit else None): pending.add(task) if pending: done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) for task in done: yield task else: break ---- It is an open question whether such function should yield results in the task submission order. Albeit useful, it's a bit harder to implement and (most importantly) has terrible worst-case memory behavior. See also: https://bugs.python.org/issue33533 https://github.com/h2non/paco/issues/38 |
|||
msg336237 - (view) | Author: twisteroid ambassador (twisteroid ambassador) * | Date: 2019-02-21 16:53 | |
I feel like once you lay out all the requirements: taking futures from an (async) generator, limiting the number of concurrent tasks, getting completed tasks to one consumer "as completed", and an implicit requirement that back pressure from the consumer should be handled (i.e. if whoever's iterating through "async for fut in as_completed(...)" is too slow, then the tasks should pause until it catches up), there are too many moving parts, and this should really be implemented using several tasks. So a straightforward implementation may look like this: async def better_as_completed(futs, limit): MAX_DONE_FUTS_HELD = 10 # or some small number sem = asyncio.Semaphore(limit) done_q = asyncio.Queue(MAX_DONE_FUTS_HELD) async def run_futs(): async for fut in futs: await sem.acquire() asyncio.create_task(run_one_fut(fut)) async with sem: await done_q.put(None) async def run_one_fut(fut): try: fut = asyncio.ensure_future(fut) await asyncio.wait((fut,)) await done_q.put(fut) finally: sem.release() asyncio.create_task(run_futs()) while True: next_fut = await done_q.get() if next_fut is None: return yield next_fut Add proper handling for cancellation and exceptions and whatnot, and it may become a usable implementation. And no, I do not feel like this should be added to asyncio.as_completed. |
|||
msg336526 - (view) | Author: Andrey Paramonov (aparamon) | Date: 2019-02-25 14:55 | |
> an implicit requirement that back pressure from the consumer should be handled (i.e. if whoever's iterating through "async for fut in as_completed(...)" is too slow, then the tasks should pause until it catches up) No, I don't think it is required or desired to be handled. My initial sketch was imprecise: it's better to asynchronously yield task results, not "completed task" futures. This way, no additional buffer needed, all error handling can be consolidated inside `igather()`, and that's actually more compatible with `asyncio.gather()`. I.e, instead of ---- yield next_fut ---- use ---- yield await next_fut ---- |
|||
msg340318 - (view) | Author: Andrey Paramonov (aparamon) | Date: 2019-04-16 07:34 | |
Hello! Below is updated implementation containing more consistent error handling. The main rough edges encountered: 1. asyncio.Queue alone proved insufficient for precise control of limit, as asyncio.create_task() schedules created Task() immediately and it may start executing before being added to queue (thus leading to limit+1 tasks running). Additional semaphore is used to tackle that. 2. When exception, other running tasks have to be cancel()ed and then await'ed to ensure all tasks are successfully finished by the time igather exits. Just cancel()ing proved not sufficient. 3. When exception, unscheduled coroutines have to be wrapped with asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never awaited". But maybe there is a more elegant way to suppress this warning for a coroutine? In my client code I didn't so far encounter "an implicit requirement that back pressure from the consumer should be handled", but it should be possible to implement separately and quite straightforwardly, with the help of asyncio.Queue. async def igather(coros, limit=None): coros = iter(coros) buf = asyncio.Queue() sem = asyncio.Semaphore(limit or math.inf) async def submit(coros, buf): while True: await sem.acquire() try: # TODO: additionally support async iterators coro = next(coros) except StopIteration: break task = asyncio.create_task(coro) buf.put_nowait(task) await buf.put(None) async def consume(buf): while True: task = await buf.get() if task: v = await asyncio.wait_for(task, None) sem.release() yield v else: break submit_task = asyncio.create_task(submit(coros, buf)) try: async for result in consume(buf): yield result except: submit_task.cancel() # cancel scheduled while not buf.empty(): task = buf.get_nowait() if task: task.cancel() try: await task except: pass # cancel pending for coro in coros: asyncio.create_task(coro).cancel() raise Shall I go ahead and prepare a PR with docs and tests? |
|||
msg351481 - (view) | Author: Lisa Roach (lisroach) * | Date: 2019-09-09 14:35 | |
I would like to see this implemented. I run into memory and speed issues when running with 1000+ tasks quite frequently, and have to write my own rate limiters around it. It doesn't look to me that it is adding a large amount of complexity to as_completed. This was closed in 2017, perhaps it is worth revisiting? |
|||
msg351482 - (view) | Author: Yury Selivanov (yselivanov) * | Date: 2019-09-09 14:41 | |
We want to add TaskGroups to asyncio (a similar concept to Trio's nurseries). TaskGroups use the `async with` statement to clearly define where Tasks are created and at which point they are expected to be completed or destroyed. asyncio.gather(), asyncio.as_completed(), and few others will be considered legacy APIs after we implement TaskGroups. Implementing rate limiting on top of TaskGroups is easier and more reliable. I'd really prefer to keep as_completed() and especially gather() as is, as I consider them a bit broken already. |
|||
msg351484 - (view) | Author: Lisa Roach (lisroach) * | Date: 2019-09-09 14:43 | |
Sounds good, thanks for the explanation Yury. I look forward to the TaskGroups! |
|||
msg351486 - (view) | Author: Yury Selivanov (yselivanov) * | Date: 2019-09-09 14:46 | |
FWIW I've been using TaskGroups in EdgeDB codebase extensively: https://github.com/edgedb/edgedb/blob/master/edb/common/taskgroup.py (you can use the code, it's Apache 2) The only thing that prevented us from merging them in 3.8 is that we need to formally define & implement ExceptionGroup (or MultiError) in CPython. I'm going to work on an initial PEP for that this week. |
|||
msg351490 - (view) | Author: Lisa Roach (lisroach) * | Date: 2019-09-09 14:53 | |
Oh nice, I remember talking to you about the MultiError before, it will help simplify some Mock things. Happy to help out if you want more eyes on it. On Mon, Sep 9, 2019 at 3:46 PM Yury Selivanov <report@bugs.python.org> wrote: > > Yury Selivanov <yselivanov@gmail.com> added the comment: > > FWIW I've been using TaskGroups in EdgeDB codebase extensively: > https://github.com/edgedb/edgedb/blob/master/edb/common/taskgroup.py (you > can use the code, it's Apache 2) > > The only thing that prevented us from merging them in 3.8 is that we need > to formally define & implement ExceptionGroup (or MultiError) in CPython. > I'm going to work on an initial PEP for that this week. > > ---------- > > _______________________________________ > Python tracker <report@bugs.python.org> > <https://bugs.python.org/issue30782> > _______________________________________ > |
|||
msg351491 - (view) | Author: Yury Selivanov (yselivanov) * | Date: 2019-09-09 14:54 | |
> it will help simplify some Mock things. Yeah, we'll need to chat about that so I can use Mock requirements in the PEP. :) |
History | |||
---|---|---|---|
Date | User | Action | Args |
2022-04-11 14:58:48 | admin | set | github: 74965 |
2021-10-13 22:44:14 | iforapsy | set | nosy:
+ iforapsy |
2019-09-09 14:54:30 | yselivanov | set | messages: + msg351491 |
2019-09-09 14:53:07 | lisroach | set | messages: + msg351490 |
2019-09-09 14:46:09 | yselivanov | set | messages: + msg351486 |
2019-09-09 14:43:53 | lisroach | set | messages: + msg351484 |
2019-09-09 14:41:34 | yselivanov | set | messages: + msg351482 |
2019-09-09 14:35:21 | lisroach | set | nosy:
+ lisroach messages: + msg351481 |
2019-04-16 07:34:20 | aparamon | set | messages: + msg340318 |
2019-02-25 14:55:58 | aparamon | set | messages: + msg336526 |
2019-02-21 16:53:02 | twisteroid ambassador | set | nosy:
+ twisteroid ambassador messages: + msg336237 |
2019-02-21 09:00:15 | aparamon | set | nosy:
+ aparamon messages: + msg336192 |
2019-02-08 15:51:43 | glin | set | nosy:
+ glin messages: + msg335090 |
2018-01-11 08:14:43 | asvetlov | set | messages: + msg309794 |
2018-01-09 09:58:59 | andybalaam | set | messages: + msg309694 |
2017-12-20 19:11:32 | yselivanov | set | messages:
+ msg308779 versions: + Python 3.8, - Python 3.7 |
2017-12-20 19:07:29 | asvetlov | set | nosy:
+ asvetlov messages: + msg308776 |
2017-08-17 07:57:44 | andybalaam | set | messages: + msg300401 |
2017-06-27 01:10:51 | andybalaam | set | pull_requests: + pull_request2475 |
2017-06-27 01:05:22 | andybalaam | create |