import asyncio from asyncio import ensure_future, locks # cpython/Lib/asyncio/tasks.py async def materialize(aws, *, max_concurrency=None): current_concurrency = 0 notifiers = {} results = {} def _done_callback(fut, idx): results[idx] = fut.result() notifiers[idx].set() aw_idx = -1 yield_idx = 0 for aw_idx, aw in enumerate(aws): if current_concurrency == max_concurrency: await notifiers[yield_idx].wait() yield results.pop(yield_idx) notifiers.pop(yield_idx) current_concurrency -= 1 yield_idx += 1 fut = ensure_future(aw) fut.add_done_callback(lambda fut, idx=aw_idx: _done_callback(fut, idx)) current_concurrency += 1 notifiers[aw_idx] = locks.Event() for yield_idx in range(yield_idx, aw_idx + 1): await notifiers[yield_idx].wait() yield results.pop(yield_idx) async def do(n: int) -> None: print('running', n) await asyncio.sleep(1) print('returning', n) return n async def main(): result = [] async for x in materialize(map(do, range(5)), max_concurrency=2): print('got', x) result.append(x) print(result) asyncio.run(main())