#!/usr/bin/env python import asyncio class Stream(object): def __init__(self): self._queue = asyncio.Queue(1) self._num = 0 self._last_read_num = 0 async def write(self, item): self._num += 1 await self._queue.put((self._num, item)) async def read(self): num, item = await self._queue.get() print(f'num={num}, last_read_num={self._last_read_num}') assert num == self._last_read_num + 1 self._last_read_num = num await asyncio.sleep(0.0) return item async def produce(stream): loop = asyncio.get_running_loop() cnt = 0 while True: loop.create_task(stream.write('abc')) cnt += 1 if cnt == 32: await asyncio.sleep(0.0) cnt = 0 async def main(): stream = Stream() asyncio.create_task(produce(stream)) while True: await stream.read() asyncio.run(main())