import asyncio #import io import _pyio as io import os import sys class AsyncFileIO: # Linux implementation using preadv(RWF_NOWAIT) for cached read # and a thread pool for uncached reads def __init__(self, filename): self._fd = os.open(filename, os.O_RDONLY) def readable(self): return True def closed(self): return (self._fd is None) def _check_closed(self): if self.closed(): raise ValueError("I/O on closed file") async def read(self, n: int): self._check_closed() # reminder: can return less than n bytes ;-) try: return self._read_nonblocking(n) except BlockingIOError: pass data = await self._read_async(n) return data def _read_nonblocking(self, n): buffer = bytearray(n) pos = os.lseek(self._fd, 0, os.SEEK_CUR) n = os.preadv(self._fd, [buffer], pos, os.RWF_NOWAIT) os.lseek(self._fd, n, os.SEEK_CUR) return buffer[:n] def _read_blocking(self, n): return os.read(self._fd, n) async def _read_async(self, n): # implementation using a thread pool loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, self._read_blocking, n) return data def close(self): os.close(self._fd) self._fd = None class FileIOSandwich: # only support sequential read so far def __init__(self, raw): self._raw = raw self._storage = io.BytesIO() self._reset_read() def _reset_read(self): self._latest_read = None self._partial = False def readable(self): return self._raw.readable() def _get_storage_size(self): with self._storage.getbuffer() as buf: return len(buf) def _can_read_buffer(self, n): pos = self._storage.tell() buffered = self._get_storage_size() - pos return (n <= buffered) async def _prepare_read(self): n = self._latest_read if n is None: raise ValueError("read() was not called previously") if not self._can_read_buffer(n): pos = self._storage.tell() buffered = self._get_storage_size() - pos chunk = await self._raw.read(n - buffered) self._storage.seek(0, os.SEEK_END) self._storage.write(chunk) self._storage.seek(pos) self._partial = True def read(self, n): if self._partial or self._can_read_buffer(n): chunk = self._storage.read(n) self._latest_read = None return chunk else: self._latest_read = n return None def close(self): self._raw.close() def closed(self): return self._raw.closed() class AsyncBufferedReader: """ Glue between asynchronous AsyncFileIO and blocking io.BufferedReader * call _reset_read() * blocking read() returns None * call asynchronous _prepare_read() * new attempt to blocking read() will succeed """ def __init__(self, raw): self._raw = FileIOSandwich(raw) self._blocking = io.BufferedReader(self._raw) # note: no public raw object async def read(self, n: int): self._raw._reset_read() data = self._blocking.read(n) if data is None: await self._raw._prepare_read() data = self._blocking.read(n) assert data is not None return data async def readline(self): self._raw._reset_read() data = self._blocking.readline() if data is None: await self._raw._prepare_read() data = self._blocking.readline() assert data is not None return data def close(self): self._blocking.close() async def main(): filename = sys.argv[1] raw = AsyncFileIO(filename) f = AsyncBufferedReader(raw) #data = await f.read(100) data = await f.readline() f.close() print("data: %a" % data) if __name__ == "__main__": asyncio.run(main())