This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Provide an async iterator version of as_completed
Type: enhancement Stage: patch review
Components: asyncio Versions: Python 3.10
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: JustinTArthur, Roman.Evstifeev, asvetlov, carlodri, hniksic, mivade, yselivanov
Priority: normal Keywords: patch

Created on 2018-05-16 06:48 by hniksic, last changed 2022-04-11 14:59 by admin.

Pull Requests
URL Status Linked Edit
PR 10251 open mivade, 2018-10-31 02:17
PR 22491 open JustinTArthur, 2020-10-02 04:23
Messages (11)
msg316773 - (view) Author: Hrvoje Nikšić (hniksic) * Date: 2018-05-16 06:48
Judging by questions on the StackOverflow python-asyncio tag[1][2], it seems that users find it hard to understand how to use as_completed correctly. I have identified three issues:

* It's somewhat sparingly documented.

A StackOverflow user ([2]) didn't find it obvious that it runs the futures in parallel. Unless one is already aware of the meaning, the term "as completed" could suggest that they are executed and completed sequentially.

* Unlike its concurrent.futures counter-part, it's non-blocking.

This sounds like a good idea because it's usable from synchronous code, but it means that the futures it yields aren't completed, you have to await them first. This is confusing for a function with "completed" in the name, and is not the case with concurrent.futures.as_completed, nor with other waiting functions in asyncio (gather, wait, wait_for).

* It yields futures other than those that were passed in.

This prevents some usual patterns from working, e.g. associating the results with context data, such as Python docs itself uses for concurrent.futures.as_completed in https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example .  See SO question [1] for a similar request in asyncio.


Here is my proposal to address the issues.

I believe the usage problems stem from as_completed predating the concept of async iterators. If we had async iterators back then, as_completed would have been an obvious candidate to be one. In that case it could be both "blocking" (but not for the event loop) and return the original futures. For example:

async def as_completed2(fs):
    pending = set(map(asyncio.ensure_future(fs)))
    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        yield from done

(It is straightforward to add support for a timeout argument.)

I propose to deprecate asyncio.as_completed and advertise the async-iterator version like the one presented here - under a nicer name, such as as_done(), or as_completed_async().



[1] https://stackoverflow.com/questions/50028465/python-get-reference-to-original-task-after-ordering-tasks-by-completion
[2] https://stackoverflow.com/questions/50355944/yield-from-async-generator-in-python-asyncio
msg316774 - (view) Author: Hrvoje Nikšić (hniksic) * Date: 2018-05-16 06:49
Of course, `yield from done` would actually have to be `for future in done: yield future`, since async generators don't support yield from.
msg316963 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2018-05-17 16:43
I like the idea. Let's revisit it after Python 3.7 is released.
msg317247 - (view) Author: Hrvoje Nikšić (hniksic) * Date: 2018-05-21 18:34
Another option occurred to me: as_completed could return an object that implements both synchronous and asynchronous iteration protocol:


class as_completed:
    def __init__(fs, *, loop=None, timeout=None):
        self.__fs = fs
        self.__loop = loop
        self.__timeout = timeout

    def __iter__(self):
        # current implementation here
        ...

    async def __aiter__(self):
        # new async implementation here
        ...

    def __next__(self):
        # defined for backward compatibility with code that expects
        # as_completed() to return an iterator rather than an iterable
        if self._iter is None:
            self._iter = iter(self)
        return next(self._iter)

With that design there wouldn't need to be a new function under a different name; instead, as_completed could just be documented as an asynchronous iterable, with the old synchronous iteration supported for backward compatibility.
msg326748 - (view) Author: Hrvoje Nikšić (hniksic) * Date: 2018-09-30 21:13
If there is interest in this, I'd like to attempt a PR for a sync/async variant of as_completed.

Note that the new docs are *much* clearer, so the first (documentation) problem from the description is now fixed. Although the documentation is still brief, it now contains the key pieces of information: 1) that the futures are actually run in parallel, and 2) that each yielded future produces the next result that becomes available. Neither was actually stated in the old docs (!), so this is a marked improvement.
msg328454 - (view) Author: Michael DePalatis (mivade) * Date: 2018-10-25 15:50
Is there any progress on this? I was thinking the exact same thing regarding the backwards-compatible approach and would like to work on it if no one else is.
msg328527 - (view) Author: Hrvoje Nikšić (hniksic) * Date: 2018-10-26 08:57
I didn't start working on the PR, so please go ahead if you're interested.

One small suggestion: If you're implementing this, please note that the proof-of-concept implementation shown in the description is not very efficient because each call to `wait` has to iterate over all the futures (which can be potentially large in number) to set up and tear down the done callbacks on each one. A more efficient implementation would set up the callbacks only once - see https://stackoverflow.com/a/51403277/1600898 for an example.
msg377797 - (view) Author: Justin Arthur (JustinTArthur) * Date: 2020-10-02 04:36
I've added a new PR, PR 22491. This one has as_completed returning an iterator and includes tests for both the old and new style.

I see a trivial amount of latency added in extra call stack over Guido's original implementation. Should we decide to deprecate the plain-iterator-of-awaitables form, we can possibly regain that latency in refactoring this as an asynchronous generator.
msg377803 - (view) Author: Hrvoje Nikšić (hniksic) * Date: 2020-10-02 08:14
Hi, thanks for providing a PR. One thing I noticed is that the implementation in the PR yields results of the futures from the generator. This issue proposes a generator that instead yields the futures passed to as_completed. This is needed not just for consistency with concurrent.futures.as_completed, but also to allow associating results with the requests that produced them, which is an important use case for as_completed.

An example of how this is useful is the snippet at https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example which you cannot implement with an as_completed that doesn't yield the original futures. The StackOverflow question at [1] inquires about that issue, and would be easily resolved by yielding the futures as concurrent.futures.as_completed does.

As far as I can tell, the needed change is simple: just yield `f` instead of `f.result()` from `_wait_for_one` when invoked from __anext__. (It must still return f.result() when called from __next__ for backward compatibility.)

[1] https://stackoverflow.com/questions/50028465/python-get-reference-to-original-task-after-ordering-tasks-by-completion
msg377840 - (view) Author: Justin Arthur (JustinTArthur) * Date: 2020-10-02 23:28
Thanks, Hrvoje. I've updated the patch to match this bug's suggested format and have updated the documentation and What's New.

The one quirk that comes with the benefit of getting the same futures back is that we still allow both coroutines and futures to be passed in. Coroutines will not be yielded back in their original form, but instead a new task.
msg377927 - (view) Author: Hrvoje Nikšić (hniksic) * Date: 2020-10-04 09:00
Justin, thanks for updating the PR! I'll take another look at the code.
History
Date User Action Args
2022-04-11 14:59:00adminsetgithub: 77714
2020-10-16 06:44:21carlodrisetnosy: + carlodri
2020-10-08 16:25:30JustinTArthursetversions: + Python 3.10, - Python 3.8
2020-10-04 09:00:37hniksicsetmessages: + msg377927
2020-10-03 00:17:11JustinTArthursettitle: Provide an async-generator version of as_completed -> Provide an async iterator version of as_completed
2020-10-02 23:28:10JustinTArthursetmessages: + msg377840
2020-10-02 08:14:04hniksicsetmessages: + msg377803
2020-10-02 04:36:27JustinTArthursetmessages: + msg377797
2020-10-02 04:23:54JustinTArthursetnosy: + JustinTArthur
pull_requests: + pull_request21508
2019-04-05 13:11:51Roman.Evstifeevsetnosy: + Roman.Evstifeev
2018-10-31 02:17:01mivadesetkeywords: + patch
stage: patch review
pull_requests: + pull_request9565
2018-10-26 08:57:34hniksicsetmessages: + msg328527
2018-10-25 15:50:20mivadesetnosy: + mivade
messages: + msg328454
2018-09-30 21:13:35hniksicsetmessages: + msg326748
2018-05-21 18:34:42hniksicsetmessages: + msg317247
2018-05-17 16:43:34yselivanovsetmessages: + msg316963
2018-05-16 19:26:33hniksicsettype: enhancement
2018-05-16 06:49:56hniksicsetmessages: + msg316774
2018-05-16 06:48:52hniksiccreate