if 0: import os, logging os.environ['PYTHONASYNCIODEBUG'] = '1' logging.basicConfig(level=logging.DEBUG) import asyncio import signal import faulthandler import os # Only a server on a public IP will raise the AssertionError SERVER_HOST = '127.0.0.1' SERVER_PORT = 8888 # Number of messages sent concurrently. You may need to increase this number. FLOOD = 100 DATA_SIZE = (1024 * 1024) async def flooding(writer): # data large enough to pause the stream writer.write(b'x' * (DATA_SIZE - 1) + b'\n') await writer.drain() async def reading(reader): while True: data = await reader.read(100) if not data: break def stop(writer): print("stop! close writer") writer.close() async def run(loop): reader, writer = await asyncio.open_connection( SERVER_HOST, SERVER_PORT, loop=loop , limit=1024 ) print("connected") transport = writer._transport print("transport writer limits: low=%.1f kB, high=%.1f kB" % (transport._low_water / 1024, transport._high_water / 1024)) if 0: for signame in ('SIGINT', 'SIGTERM'): signum = getattr(signal, signame) loop.add_signal_handler(signum, lambda: stop(writer)) faulthandler.register(signal.SIGUSR1) print("kill -USR1 %s" % os.getpid()) print("read") reading_fut = loop.create_task(reading(reader)) print("write in parallel") futs = [loop.create_task(flooding(writer)) for _ in range(FLOOD)] print("wait everything") await asyncio.wait(futs, loop=loop) writer.close() print("wait for read") await reading_fut print("done") if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(run(loop)) loop.close()