Author aparamon
Recipients andybalaam, aparamon, asvetlov, glin, twisteroid ambassador, yselivanov
Date 2019-04-16.07:34:20
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1555400060.82.0.487105108133.issue30782@roundup.psfhosted.org>
In-reply-to
Content
Hello!

Below is updated implementation containing more consistent error handling.
The main rough edges encountered:

1. asyncio.Queue alone proved insufficient for precise control of limit, as asyncio.create_task() schedules created Task() immediately and it may start executing before being added to queue (thus leading to limit+1 tasks running). Additional semaphore is used to tackle that.

2. When exception, other running tasks have to be cancel()ed and then await'ed to ensure all tasks are successfully finished by the time igather exits. Just cancel()ing proved not sufficient.

3. When exception, unscheduled coroutines have to be wrapped with asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never awaited". But maybe there is a more elegant way to suppress this warning for a coroutine?

In my client code I didn't so far encounter "an implicit requirement that back pressure from the consumer should be handled", but it should be possible to implement separately and quite straightforwardly, with the help of asyncio.Queue.

    async def igather(coros, limit=None):
        coros = iter(coros)

        buf = asyncio.Queue()
        sem = asyncio.Semaphore(limit or math.inf)

        async def submit(coros, buf):
            while True:
                await sem.acquire()
                try:
                    # TODO: additionally support async iterators
                    coro = next(coros)
                except StopIteration:
                    break
                task = asyncio.create_task(coro)
                buf.put_nowait(task)
            await buf.put(None)

        async def consume(buf):
            while True:
                task = await buf.get()
                if task:
                    v = await asyncio.wait_for(task, None)
                    sem.release()
                    yield v
                else:
                    break

        submit_task = asyncio.create_task(submit(coros, buf))
        try:
            async for result in consume(buf):
                yield result
        except:
            submit_task.cancel()
            # cancel scheduled
            while not buf.empty():
                task = buf.get_nowait()
                if task:
                    task.cancel()
                    try:
                        await task
                    except:
                        pass
            # cancel pending
            for coro in coros:
                asyncio.create_task(coro).cancel()
            raise

Shall I go ahead and prepare a PR with docs and tests?
History
Date User Action Args
2019-04-16 07:34:20aparamonsetrecipients: + aparamon, andybalaam, asvetlov, yselivanov, glin, twisteroid ambassador
2019-04-16 07:34:20aparamonsetmessageid: <1555400060.82.0.487105108133.issue30782@roundup.psfhosted.org>
2019-04-16 07:34:20aparamonlinkissue30782 messages
2019-04-16 07:34:20aparamoncreate