Message340318
Hello!
Below is updated implementation containing more consistent error handling.
The main rough edges encountered:
1. asyncio.Queue alone proved insufficient for precise control of limit, as asyncio.create_task() schedules created Task() immediately and it may start executing before being added to queue (thus leading to limit+1 tasks running). Additional semaphore is used to tackle that.
2. When exception, other running tasks have to be cancel()ed and then await'ed to ensure all tasks are successfully finished by the time igather exits. Just cancel()ing proved not sufficient.
3. When exception, unscheduled coroutines have to be wrapped with asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never awaited". But maybe there is a more elegant way to suppress this warning for a coroutine?
In my client code I didn't so far encounter "an implicit requirement that back pressure from the consumer should be handled", but it should be possible to implement separately and quite straightforwardly, with the help of asyncio.Queue.
async def igather(coros, limit=None):
coros = iter(coros)
buf = asyncio.Queue()
sem = asyncio.Semaphore(limit or math.inf)
async def submit(coros, buf):
while True:
await sem.acquire()
try:
# TODO: additionally support async iterators
coro = next(coros)
except StopIteration:
break
task = asyncio.create_task(coro)
buf.put_nowait(task)
await buf.put(None)
async def consume(buf):
while True:
task = await buf.get()
if task:
v = await asyncio.wait_for(task, None)
sem.release()
yield v
else:
break
submit_task = asyncio.create_task(submit(coros, buf))
try:
async for result in consume(buf):
yield result
except:
submit_task.cancel()
# cancel scheduled
while not buf.empty():
task = buf.get_nowait()
if task:
task.cancel()
try:
await task
except:
pass
# cancel pending
for coro in coros:
asyncio.create_task(coro).cancel()
raise
Shall I go ahead and prepare a PR with docs and tests? |
|
Date |
User |
Action |
Args |
2019-04-16 07:34:20 | aparamon | set | recipients:
+ aparamon, andybalaam, asvetlov, yselivanov, glin, twisteroid ambassador |
2019-04-16 07:34:20 | aparamon | set | messageid: <1555400060.82.0.487105108133.issue30782@roundup.psfhosted.org> |
2019-04-16 07:34:20 | aparamon | link | issue30782 messages |
2019-04-16 07:34:20 | aparamon | create | |
|