This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: asyncio.gather of large streams with limited resources
Type: enhancement Stage:
Components: asyncio Versions: Python 3.10
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: asvetlov, cjrh, kamado2, kamadorueda, yselivanov
Priority: normal Keywords:

Created on 2020-08-08 01:24 by kamadorueda, last changed 2022-04-11 14:59 by admin.

Messages (3)
msg375028 - (view) Author: Kevin Amado (kamadorueda) Date: 2020-08-08 01:24
Sometimes when dealing with high concurrency systems developers face the problem of executing concurrently a large number of tasks while taking care of a finite pool of resources

Just to mention some examples:
- reading asynchronously a lot of files without exceeding the maximum number of open files by the operative system
- making millions of requests to a website, doing it in sufficiently small batches as not to be banned by the site's firewall or hitting API limits
- making a lot of DNS lookups without exceeding the maximum number of open sockets allowed by the operative system
- and many more

What these examples have in common is that there is a hard-limit in the maximum concurrency possible to solve the problem.

A naive approach is to split the long list of tasks in small batches and use asyncio.gather on each batch. This, however, has some downsides if one of the tasks takes more time than the others because at some point in time only this task would be running and the execution of the following batch gets delayed, impacting performance and overall throughput and execution time.

Another approach is to use asyncio.wait on a subset of tasks, gathering the done tasks and appending more tasks from the remaining subset until all tasks get executed. This alternative is good but still far from optimal as many boilerplate code is needed.

The ideal approach is to operate in the possibly infinite list of tasks with an always constant number of them being resolved concurrently. If one of the tasks being concurrently executed finishes then immediately another one is fetched from the input stream and added to the list of concurrently executing ones. By doing it in this way we optimize the resources needed while minimizing the total execution time and never exceeding the finite pool of resources (sockets, open files, http API limit), etc.

What I'm attaching is a proof of concept of a new function to add to the asyncio.tasks module that implements the ideal approach.

The proposed signature for such function is:

  async def materialize(aws, *, max_concurrency=None)

And functions in this way:

```
async def do(n: int) -> None:
    print('running', n)
    await asyncio.sleep(1)
    print('returning', n)
    return n

async def main():
    result = []
    async for x in materialize(map(do, range(5)), max_concurrency=2):
        print('got', x)
        result.append(x)

    print(result)
```

Whose output is:

running 0
running 1
returning 0
returning 1
got 0
got 1
running 2
running 3
returning 2
returning 3
got 2
got 3
running 4
returning 4
got 4
[0, 1, 2, 3, 4]

As you can see, tasks are resolved concurrently without exceeding the max concurrency allowed, yet always executing concurrently as many tasks as the limit specifies. Yielding results as soon as available, keeping a small memory footprint (proportional to the max concurrency allowed) and returning results in the same order of the input stream (opposite to asyncio.as_completed)

Since it's an asynchronous generator it can deal with infinite input streams, which is nice!

I'm willing to work further on a PR
msg375807 - (view) Author: Caleb Hattingh (cjrh) * Date: 2020-08-23 06:16
The traditional way this done is with a finite number of workers pulling work off a queue. This is straightforward to set up with builtins:


from uuid import uuid4
import asyncio, random


async def worker(q: asyncio.Queue):
    while job := await q.get():
        print(f"working on job {job}")
        await asyncio.sleep(random.random() * 5)
        print(f"Completed job {job}")
        q.task_done()


async def scheduler(q, max_concurrency=5):
    workers = []
    for i in range(max_concurrency):
        w = asyncio.create_task(worker(q))
        workers.append(w)

    try:
        await asyncio.gather(*workers)
    except asyncio.CancelledError:
        pass


async def main():
    jobs = [uuid4().hex for i in range(1_000)]
    q = asyncio.Queue()
    for job in jobs:
        await q.put(job)

    t = asyncio.create_task(scheduler(q))
    await q.join()
    t.cancel()
    await t


if __name__ == "__main__":
    asyncio.run(main())


A neater API would be something like our Executor API in concurrent.futures, but we don't yet have one of those for asyncio.  I started playing with some ideas for this a while ago here: https://github.com/cjrh/coroexecutor

Alas, I did not yet add a "max_workers" parameter so that isn't available in my lib yet. I discuss options for implementing that in an issue: https://github.com/cjrh/coroexecutor/issues/2

I believe that the core devs are working on a feature that might also help for this, called "task groups", but I haven't been following closely so I don't know where that's at currently.
msg375818 - (view) Author: Kevin Amado (kamado2) Date: 2020-08-23 15:53
Yeah definitely it must be workers

I've experimented a lot about it and finally found something with an interface
similar to asyncio.as_completed

- You control concurrency with `workers` parameter
- You upper-bound memory usage with `worker_greediness` parameter
- Results are yielded back in the same order of the input
- Results are yielded! so working over an unknown-sized iterable of `awaitables` like map(func, thins_to_do) or a generator is no problem

The implementation may no be the cleanest as it uses some Events and N Queues but
it's proven in tests (keep reading to the end) that the overhead is negligible

    def resolve(
        awaitables: Iterable[Awaitable[T]],
        *,
        workers: int = 1024,
        worker_greediness: int = 0,
    ) -> Iterable[Awaitable[T]]:
        """
        if workers < 1:
            raise ValueError('workers must be >= 1')
        if worker_greediness < 0:
            raise ValueError('worker_greediness must be >= 0')

        if hasattr(awaitables, '__len__'):
            # A small optimization can be done if we know the length
            workers = min(workers, len(awaitables))

        loop = asyncio.get_event_loop()
        store: Dict[int, asyncio.Queue] = {}
        stream, stream_copy = tee(enumerate(awaitables))
        stream_finished = asyncio.Event()
        workers_up = asyncio.Event()
        workers_tasks: Dict[int, asyncio.Task] = {}

        async def worker() -> None:
            done: asyncio.Queue = asyncio.Queue(worker_greediness)
            for index, awaitable in stream:
                store[index] = done
                future = loop.create_future()
                future.set_result(await schedule(awaitable, loop=loop))
                await done.put(future)
                workers_up.set()
            workers_up.set()
            stream_finished.set()

        async def start_workers() -> None:
            for index in range(workers):
                if stream_finished.is_set():
                    break
                workers_tasks[index] = asyncio.create_task(worker())
                await force_loop_cycle()
            await workers_up.wait()

        async def get_one(index: int) -> Awaitable[T]:
            if not workers_tasks:
                await start_workers()

            awaitable = await store.pop(index).get()
            result: Awaitable[T] = (await awaitable).result()
            return result

        for index, _ in stream_copy:
            yield cast(Awaitable[T], get_one(index))


Some notes on the usage and outputs are part of the docs of this library:

https://kamadorueda.github.io/aioextensions/#aioextensions.resolve

Here are some proofs about the implementation:
- There is bound-concurrency:
  https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L138
- Workers are always busy even if one of them is processing a long-running job:
  https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L131
- Many workers do not add overhead:
  https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L156
- Errors can be caught on retrieval:
  https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L128
History
Date User Action Args
2022-04-11 14:59:34adminsetgithub: 85677
2020-08-23 15:53:03kamado2setnosy: + kamado2
messages: + msg375818
2020-08-23 06:16:59cjrhsetnosy: + cjrh
messages: + msg375807
2020-08-08 03:28:41kamadoruedasetfiles: - materialize-implementation.py
2020-08-08 01:24:55kamadoruedacreate