Message336237
I feel like once you lay out all the requirements: taking futures from an (async) generator, limiting the number of concurrent tasks, getting completed tasks to one consumer "as completed", and an implicit requirement that back pressure from the consumer should be handled (i.e. if whoever's iterating through "async for fut in as_completed(...)" is too slow, then the tasks should pause until it catches up), there are too many moving parts, and this should really be implemented using several tasks.
So a straightforward implementation may look like this:
async def better_as_completed(futs, limit):
MAX_DONE_FUTS_HELD = 10 # or some small number
sem = asyncio.Semaphore(limit)
done_q = asyncio.Queue(MAX_DONE_FUTS_HELD)
async def run_futs():
async for fut in futs:
await sem.acquire()
asyncio.create_task(run_one_fut(fut))
async with sem:
await done_q.put(None)
async def run_one_fut(fut):
try:
fut = asyncio.ensure_future(fut)
await asyncio.wait((fut,))
await done_q.put(fut)
finally:
sem.release()
asyncio.create_task(run_futs())
while True:
next_fut = await done_q.get()
if next_fut is None:
return
yield next_fut
Add proper handling for cancellation and exceptions and whatnot, and it may become a usable implementation.
And no, I do not feel like this should be added to asyncio.as_completed. |
|
Date |
User |
Action |
Args |
2019-02-21 16:53:02 | twisteroid ambassador | set | recipients:
+ twisteroid ambassador, andybalaam, asvetlov, yselivanov, glin, aparamon |
2019-02-21 16:53:02 | twisteroid ambassador | set | messageid: <1550767982.66.0.149266310717.issue30782@roundup.psfhosted.org> |
2019-02-21 16:53:02 | twisteroid ambassador | link | issue30782 messages |
2019-02-21 16:53:02 | twisteroid ambassador | create | |
|