import sys import os class BufReader: def __init__(self, stream): self.stream = stream self.buf = "" self.eof = False def _pop(self, size=-1, eof=None): if size < 0: buf = self.buf self.buf = "" else: buf = self.buf[:size] self.buf = self.buf[size:] if eof is not None: self.eof = eof return buf def _read_chunks(self, size=-1, chunk_func=None): buflen = len(self.buf) while (size < 0) or (buflen < size): if self.eof: return "" try: chunk = self.stream.next() except StopIteration: return self._pop(eof=True) self.buf += chunk buflen += len(chunk) if chunk_func: chunk_result = chunk_func(chunk=chunk, buflen=buflen) if chunk_result: return chunk_result return self._pop(size=size) def read(self, size=-1): return self._read_chunks(size=size) def close(self): self.stream.close() class TextReader(BufReader): def __init__(self, stream): BufReader.__init__(self, stream) self.linesep = os.linesep def readline(self, size=-1): def find_eol(chunk, buflen): # Precondition: (size < 0) or ((buflen - size) <= len(chunk)) chunklen = len(chunk) chunk_lookup_chars = chunklen if (size >= 0) and (buflen > size): chunk_lookup_chars -= (buflen - size) chunk_eol = chunk.find(self.linesep, 0, chunk_lookup_chars) if chunk_eol >= 0: bufstart = buflen - chunklen return self._pop(size=(bufstart + chunk_eol + len(self.linesep))) return False buf_eol = find_eol(chunk=self.buf, buflen=len(self.buf)) if buf_eol: return buf_eol return self._read_chunks(size=size, chunk_func=find_eol) def readlines(self, sizehint=-1): totalread = 0 lines = [] while (sizehint < 0) or (totalread < sizehint): line = self.readline(sizehint) if not line: break lines.append(line) totalread += len(line) return lines def __iter__(self): return self def next(self): s = self.readline() if s: return s else: raise StopIteration if __name__ == '__main__': m = ["abc", "de" + os.linesep + "f", "gh"] ms = "".join(m) cr = iter(m) br = BufReader(cr) assert br.read() == ms cr = iter(m) br = BufReader(cr) assert br.read(4) == ms[:4] assert br.read(1) == ms[4:5] assert br.read() == ms[5:] cr = iter(m) tr = TextReader(cr) sl = len(os.linesep) s = tr.readline(4) print [hex(ord(c)) for c in s] assert s == ms[:4] s = tr.readline() print [hex(ord(c)) for c in s] assert s == ms[4:5 + sl] s = tr.readline() print [hex(ord(c)) for c in s] assert s == ms[5 + sl:] cr = iter(m) tr = TextReader(cr) ls = [s for s in tr] assert ls == [ms[:5 + sl], ms[5 + sl:]] print "OK"