#!/usr/bin/env python3 import asyncio import textwrap RETRIES = 10 verbose = print class EchoClient(asyncio.Protocol): def __init__(self, loop, data): self.data_iter = iter(data.splitlines()) self.loop = loop self.done = asyncio.Future() self.transport = None self.last_sent = None def connection_made(self, transport): self.transport = transport verbose('Connected to server') self._write_one() def connection_lost(self, exc): verbose('Disconnected from server', exc or '') if exc: self.done.set_exception(exc) self.done.exception() # remove _tb_logger else: self.done.set_result(None) def data_received(self, data): verbose('Recv:', data.decode()) assert self.last_sent == data, "Received unexpected data" self._write_one() def _write_one(self): chunk = next(self.data_iter, None) if chunk is None: self.transport.write_eof() return line = chunk.encode() self.transport.write(line) self.last_sent = line # simulate the transport getting closed out of order due to ECONNRESET # or somesuch: self.transport.close() async def wait_until_done(client, loop, data): factory = lambda: client(loop, data) connection = loop.create_connection(factory, '127.0.0.1', 8888) transport, protocol = await connection await protocol.done def main(how_many_connections=1000): data = textwrap.dedent(""" Much to his dad and mum's dismay Hōråcé ate himself one day He didn't stop to say his grace He just sat down and ate his face""").strip() loop = asyncio.get_event_loop() load_test = loop.create_task( wait_until_done(EchoClient, loop, data) ) try: loop.run_until_complete(load_test) except KeyboardInterrupt: pass finally: loop.close() if __name__ == '__main__': main()