import asyncio def sync_wait_for(future_or_coroutine): loop = asyncio.get_event_loop() return loop.run_until_complete(future_or_coroutine) class Batch: batches = {} @classmethod def schedule(cls, key): loop = asyncio.get_event_loop() if not Batch.batches: loop.call_later(0, Batch.schedule_batches) future = loop.create_future() Batch.batches.setdefault(cls, []).append((key, future)) return future @staticmethod def schedule_batches(): loop = asyncio.get_event_loop() for batch in list(Batch.batches.keys()): loop.create_task(Batch.resolve_batch(batch, Batch.batches.pop(batch))) @staticmethod async def resolve_batch(batch, futures): batch_keys = [key for key, future in futures] future_results = batch.resolve_futures(batch_keys) if asyncio.iscoroutine(future_results): future_results = await future_results for key_future_pair, result in zip(futures, future_results): key, future = key_future_pair future.set_result(result) @staticmethod def resolve_futures(batch): raise NotImplemented() @classmethod async def gen(cls, key): return await cls.schedule(key) @classmethod async def genv(cls, keys): return await asyncio.gather(*[cls.gen(key) for key in keys]) class DoubleBatch(Batch): @staticmethod def resolve_futures(batch): return [x+x for x in batch] class SquareBatch(Batch): @staticmethod def resolve_futures(batch): return [x*x for x in batch] async def double_square(x): double = await DoubleBatch.gen(x) square = await SquareBatch.gen(double) return square async def square_double(x): square = await SquareBatch.gen(x) double = await DoubleBatch.gen(square) return double async def triple_double(x): d1 = await DoubleBatch.gen(x) d2 = await DoubleBatch.gen(d1) d3 = await DoubleBatch.gen(d2) return d3 async def double_square_square_double(x): ds = await double_square(x) sd = await square_double(ds) return sd async def big_batch(x): return await asyncio.gather( asyncio.gather( square_double(x + 0), square_double(x + 1), square_double(x + 2), ), asyncio.gather( double_square(x * 1), double_square(x * 2), double_square(x * 3), ), DoubleBatch.genv(range(x, x + 5, +1)), SquareBatch.genv(range(x, x - 5, -1)), asyncio.gather( triple_double(x - 1), triple_double(x - 2), triple_double(x - 3), ), asyncio.gather( double_square_square_double(x * 5), double_square_square_double(x * 7), double_square_square_double(x * 9), ), ) async def root(): return await asyncio.gather(*[ big_batch(x) for x in range(0, 5000) ]) if __name__ == '__main__': sync_wait_for(root())