import asyncio import socket class TestProtocol(asyncio.Protocol): def __init__(self, con_lost_fut): self.con_lost_fut = con_lost_fut def connection_made(self, transport): print(f"Connection made to {transport}.") transport.write(b"GET /index.html\r\n\r\n") def data_received(self, data: bytes): print(f"Data received: {data}") def connection_lost(self, exc): self.con_lost_fut.set_result(True) print(f"Server closed the connection. Argument: {exc}") async def main(): on_con_lost = asyncio.Future() transport, protocol = await asyncio.get_event_loop().create_connection( lambda: TestProtocol(on_con_lost), family=socket.AF_INET6, flags=socket.AI_V4MAPPED, host='neverssl.com', port=80) try: await on_con_lost finally: transport.close() if __name__ == '__main__': asyncio.run(main())