classification
Title: Allow limiting the number of concurrent tasks in asyncio.as_completed
Type: enhancement Stage:
Components: asyncio Versions: Python 3.8
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: andybalaam, asvetlov, yselivanov
Priority: normal Keywords:

Created on 2017-06-27 01:05 by andybalaam, last changed 2018-01-11 08:14 by asvetlov.

Pull Requests
URL Status Linked Edit
PR 2424 open andybalaam, 2017-06-27 01:10
Messages (6)
msg296982 - (view) Author: Andy Balaam (andybalaam) * Date: 2017-06-27 01:05
asyncio.as_completed allows us to provide lots of coroutines (or Futures) to schedule, and then deal with the results as soon as they are available, in a loop, or a streaming style.

I propose to allow as_completed to work on very large numbers of coroutines, provided through a generator (rather than a list).  In order to make this practical, we need to limit the number of coroutines that are scheduled simultaneously to a reasonable number.

For tasks that open files or sockets, a reasonable number might be 1000 or fewer.  For other tasks, a much larger number might be reasonable, but we would still like some limit to prevent us running out of memory.

I suggest adding a "limit" argument to as_completed that limits the number of coroutines that it schedules simultaneously.

For me, the key advantage of as_completed (in the proposed modified form) is that it enables a streaming style that looks quite like synchronous code, but is efficient in terms of memory usage (as you'd expect from a streaming style):


#!/usr/bin/env python3

import asyncio
import sys

limit = int(sys.argv[1])

async def double(x):
    await asyncio.sleep(1)
    return x * 2

async def print_doubles():
    coros = (double(x) for x in range(1000000))
    for res in asyncio.as_completed(coros, limit=limit):
        r = await res
        if r % 100000 == 0:
            print(r)

loop = asyncio.get_event_loop()
loop.run_until_complete(print_doubles())
loop.close()


Using my prototype implementation, this runs faster and uses much less memory on my machine when you run it with a limit of 100K instead of 1 million concurrent tasks:

$ /usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" ./example 1000000
Memory usage: 2234552KB	Time: 97.52 seconds

$ /usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" ./example 100000
Memory usage: 252732KB	Time: 94.13 seconds

I have been working on an implementation and there is some discussion in my blog posts: http://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp/ and http://www.artificialworlds.net/blog/2017/06/27/adding-a-concurrency-limit-to-pythons-asyncio-as_completed/

Possibly the most controversial thing about this proposal is the fact that we need to allow passing a generator to as_completed instead of enforcing that it be a list.  This is fundamental to allowing the style I outlined above, but it's possible that we can do better than the blanket allowing of all generators that I did.
msg300401 - (view) Author: Andy Balaam (andybalaam) * Date: 2017-08-17 07:57
bump
msg308776 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2017-12-20 19:07
as_completed() is low-level API.
Let's not overload it with different parameters.

Anyway `as_completed()` uses only asyncio.Future and it's public API like `add_done_callback()` etc.

You can master everything what you need without asyncio modification.

Let's close the issue with "wont fix" resolution.
msg308779 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-20 19:11
I agree, let's keep as_completed() simple for now.  Handling generators+async correctly is hard, so we definitely don't have time for this in 3.7.
msg309694 - (view) Author: Andy Balaam (andybalaam) * Date: 2018-01-09 09:58
I would argue that this makes as_completed a lot more useful than it is now, so would be worth adding (maybe after 3.7).

But, if this does not go into asyncio, is there another library where it would belong?  Or should this be a completely new library?
msg309794 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2018-01-11 08:14
Third parties are not python core devs responsibility.

I don't aware about existing library with such functionality.
History
Date User Action Args
2018-01-11 08:14:43asvetlovsetmessages: + msg309794
2018-01-09 09:58:59andybalaamsetmessages: + msg309694
2017-12-20 19:11:32yselivanovsetmessages: + msg308779
versions: + Python 3.8, - Python 3.7
2017-12-20 19:07:29asvetlovsetnosy: + asvetlov
messages: + msg308776
2017-08-17 07:57:44andybalaamsetmessages: + msg300401
2017-06-27 01:10:51andybalaamsetpull_requests: + pull_request2475
2017-06-27 01:05:22andybalaamcreate