This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author kamado2
Recipients asvetlov, cjrh, kamado2, kamadorueda, yselivanov
Date 2020-08-23.15:53:02
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1598197983.25.0.556912192699.issue41505@roundup.psfhosted.org>
In-reply-to
Content
Yeah definitely it must be workers

I've experimented a lot about it and finally found something with an interface
similar to asyncio.as_completed

- You control concurrency with `workers` parameter
- You upper-bound memory usage with `worker_greediness` parameter
- Results are yielded back in the same order of the input
- Results are yielded! so working over an unknown-sized iterable of `awaitables` like map(func, thins_to_do) or a generator is no problem

The implementation may no be the cleanest as it uses some Events and N Queues but
it's proven in tests (keep reading to the end) that the overhead is negligible

    def resolve(
        awaitables: Iterable[Awaitable[T]],
        *,
        workers: int = 1024,
        worker_greediness: int = 0,
    ) -> Iterable[Awaitable[T]]:
        """
        if workers < 1:
            raise ValueError('workers must be >= 1')
        if worker_greediness < 0:
            raise ValueError('worker_greediness must be >= 0')

        if hasattr(awaitables, '__len__'):
            # A small optimization can be done if we know the length
            workers = min(workers, len(awaitables))

        loop = asyncio.get_event_loop()
        store: Dict[int, asyncio.Queue] = {}
        stream, stream_copy = tee(enumerate(awaitables))
        stream_finished = asyncio.Event()
        workers_up = asyncio.Event()
        workers_tasks: Dict[int, asyncio.Task] = {}

        async def worker() -> None:
            done: asyncio.Queue = asyncio.Queue(worker_greediness)
            for index, awaitable in stream:
                store[index] = done
                future = loop.create_future()
                future.set_result(await schedule(awaitable, loop=loop))
                await done.put(future)
                workers_up.set()
            workers_up.set()
            stream_finished.set()

        async def start_workers() -> None:
            for index in range(workers):
                if stream_finished.is_set():
                    break
                workers_tasks[index] = asyncio.create_task(worker())
                await force_loop_cycle()
            await workers_up.wait()

        async def get_one(index: int) -> Awaitable[T]:
            if not workers_tasks:
                await start_workers()

            awaitable = await store.pop(index).get()
            result: Awaitable[T] = (await awaitable).result()
            return result

        for index, _ in stream_copy:
            yield cast(Awaitable[T], get_one(index))


Some notes on the usage and outputs are part of the docs of this library:

https://kamadorueda.github.io/aioextensions/#aioextensions.resolve

Here are some proofs about the implementation:
- There is bound-concurrency:
  https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L138
- Workers are always busy even if one of them is processing a long-running job:
  https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L131
- Many workers do not add overhead:
  https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L156
- Errors can be caught on retrieval:
  https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L128
History
Date User Action Args
2020-08-23 15:53:03kamado2setrecipients: + kamado2, asvetlov, cjrh, yselivanov, kamadorueda
2020-08-23 15:53:03kamado2setmessageid: <1598197983.25.0.556912192699.issue41505@roundup.psfhosted.org>
2020-08-23 15:53:03kamado2linkissue41505 messages
2020-08-23 15:53:02kamado2create