Message375818
Yeah definitely it must be workers
I've experimented a lot about it and finally found something with an interface
similar to asyncio.as_completed
- You control concurrency with `workers` parameter
- You upper-bound memory usage with `worker_greediness` parameter
- Results are yielded back in the same order of the input
- Results are yielded! so working over an unknown-sized iterable of `awaitables` like map(func, thins_to_do) or a generator is no problem
The implementation may no be the cleanest as it uses some Events and N Queues but
it's proven in tests (keep reading to the end) that the overhead is negligible
def resolve(
awaitables: Iterable[Awaitable[T]],
*,
workers: int = 1024,
worker_greediness: int = 0,
) -> Iterable[Awaitable[T]]:
"""
if workers < 1:
raise ValueError('workers must be >= 1')
if worker_greediness < 0:
raise ValueError('worker_greediness must be >= 0')
if hasattr(awaitables, '__len__'):
# A small optimization can be done if we know the length
workers = min(workers, len(awaitables))
loop = asyncio.get_event_loop()
store: Dict[int, asyncio.Queue] = {}
stream, stream_copy = tee(enumerate(awaitables))
stream_finished = asyncio.Event()
workers_up = asyncio.Event()
workers_tasks: Dict[int, asyncio.Task] = {}
async def worker() -> None:
done: asyncio.Queue = asyncio.Queue(worker_greediness)
for index, awaitable in stream:
store[index] = done
future = loop.create_future()
future.set_result(await schedule(awaitable, loop=loop))
await done.put(future)
workers_up.set()
workers_up.set()
stream_finished.set()
async def start_workers() -> None:
for index in range(workers):
if stream_finished.is_set():
break
workers_tasks[index] = asyncio.create_task(worker())
await force_loop_cycle()
await workers_up.wait()
async def get_one(index: int) -> Awaitable[T]:
if not workers_tasks:
await start_workers()
awaitable = await store.pop(index).get()
result: Awaitable[T] = (await awaitable).result()
return result
for index, _ in stream_copy:
yield cast(Awaitable[T], get_one(index))
Some notes on the usage and outputs are part of the docs of this library:
https://kamadorueda.github.io/aioextensions/#aioextensions.resolve
Here are some proofs about the implementation:
- There is bound-concurrency:
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L138
- Workers are always busy even if one of them is processing a long-running job:
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L131
- Many workers do not add overhead:
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L156
- Errors can be caught on retrieval:
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L128 |
|
Date |
User |
Action |
Args |
2020-08-23 15:53:03 | kamado2 | set | recipients:
+ kamado2, asvetlov, cjrh, yselivanov, kamadorueda |
2020-08-23 15:53:03 | kamado2 | set | messageid: <1598197983.25.0.556912192699.issue41505@roundup.psfhosted.org> |
2020-08-23 15:53:03 | kamado2 | link | issue41505 messages |
2020-08-23 15:53:02 | kamado2 | create | |
|