Message375807
The traditional way this done is with a finite number of workers pulling work off a queue. This is straightforward to set up with builtins:
from uuid import uuid4
import asyncio, random
async def worker(q: asyncio.Queue):
while job := await q.get():
print(f"working on job {job}")
await asyncio.sleep(random.random() * 5)
print(f"Completed job {job}")
q.task_done()
async def scheduler(q, max_concurrency=5):
workers = []
for i in range(max_concurrency):
w = asyncio.create_task(worker(q))
workers.append(w)
try:
await asyncio.gather(*workers)
except asyncio.CancelledError:
pass
async def main():
jobs = [uuid4().hex for i in range(1_000)]
q = asyncio.Queue()
for job in jobs:
await q.put(job)
t = asyncio.create_task(scheduler(q))
await q.join()
t.cancel()
await t
if __name__ == "__main__":
asyncio.run(main())
A neater API would be something like our Executor API in concurrent.futures, but we don't yet have one of those for asyncio. I started playing with some ideas for this a while ago here: https://github.com/cjrh/coroexecutor
Alas, I did not yet add a "max_workers" parameter so that isn't available in my lib yet. I discuss options for implementing that in an issue: https://github.com/cjrh/coroexecutor/issues/2
I believe that the core devs are working on a feature that might also help for this, called "task groups", but I haven't been following closely so I don't know where that's at currently. |
|
Date |
User |
Action |
Args |
2020-08-23 06:16:59 | cjrh | set | recipients:
+ cjrh, asvetlov, yselivanov, kamadorueda |
2020-08-23 06:16:59 | cjrh | set | messageid: <1598163419.13.0.107443353976.issue41505@roundup.psfhosted.org> |
2020-08-23 06:16:59 | cjrh | link | issue41505 messages |
2020-08-23 06:16:58 | cjrh | create | |
|