import asyncio import tty import pty import io async def get_pipes(limit=None, loop=None): if not loop: loop = asyncio.get_event_loop() if not limit: limit = asyncio.streams._DEFAULT_LIMIT master, slave = pty.openpty() tty.setraw(master) tty.setraw(slave) writer_pipe = io.open(master, 'wb', 0) reader_pipe = io.open(slave, 'rb', 0) reader = asyncio.StreamReader(loop=loop, limit=limit) reader_protocol = asyncio.StreamReaderProtocol(reader, loop=loop) reader_transport, _ = await loop.connect_read_pipe(lambda: reader_protocol, reader_pipe) writer_transport, writer_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, writer_pipe) writer = asyncio.StreamWriter(writer_transport, writer_protocol, reader, loop) return reader, writer async def test(msg): reader, writer = await get_pipes() bmsg = msg.encode("utf-8") writer.write(bmsg) await writer.drain() result = await reader.read() assert result == bmsg print(result) def run_test(): loop = asyncio.get_event_loop() loop.run_until_complete(test("Hello World!")) if __name__ == "__main__": run_test()