import io import time import array import unittest class _IOBase(object): def __init__(self): self._closed = False self.open() def readable(self): return True def writable(self): return True def seekable(self): return False def readinto(self, buf): data = self.read(len(buf)) n = len(data) try: buf[:n] = data except TypeError as err: if not isinstance(buf, array.array): raise err buf[:n] = array.array('b', data) return n def to_bytes(seq): b = bytearray() for item in seq: b.append(item) return bytes(b) class _IOSerial(_IOBase): def open(self): self._buf = bytearray() self._closed = False def close(self): self._closed = True time.sleep(0.3) def flush(self): pass def read(self, size=1): data = bytearray() #while len(data) < size: block = to_bytes(self._buf[:size]) del self._buf[:size] data += block return bytes(data) def write(self, data): self._buf += bytes(data) return len(data) class _IO(_IOSerial, io.RawIOBase): pass class TestBufIO(unittest.TestCase): def setUp(self): self.s = _IO() self.io = io.TextIOWrapper(io.BufferedRWPair(self.s, self.s)) def tearDown(self): self.s.close() def test_bufio(self): self.io.write(unicode('hello\n')) self.io.flush() hello = self.io.readline() self.failUnlessEqual(hello, unicode('hello\n')) if __name__ == '__main__': unittest.main()