diff -r 90a06fbb1f85 Lib/_pyio.py --- a/Lib/_pyio.py Sat Oct 01 19:22:30 2011 +0200 +++ b/Lib/_pyio.py Mon Oct 03 20:51:03 2011 -0400 @@ -670,6 +670,20 @@ """ self._unsupported("detach") + def prefetch(self, b, skip=0, minread=0): + """ + Skip `skip` bytes from the stream. Try to read at least `minread` + bytes and write them into buffer `b`. + + The file pointer is advanced by at most `skip + minread`, or less if + the end of file was reached. The total number of bytes written + in `b` is returned, which can be more than `minread` + if additional bytes could be prefetched (but, of course, + cannot be more than len(b)). + """ + self._unsupported("prefetch") + + io.BufferedIOBase.register(BufferedIOBase) @@ -918,6 +932,9 @@ self._reset_read_buf() self._read_lock = Lock() + def _bytes_available(self): + return len(self._read_buf) - self._read_pos + def _reset_read_buf(self): self._read_buf = b"" self._read_pos = 0 @@ -1009,7 +1026,7 @@ def _peek_unlocked(self, n=0): want = min(n, self.buffer_size) - have = len(self._read_buf) - self._read_pos + have = self._bytes_available() if have < want or have <= 0: to_read = self.buffer_size - have while True: @@ -1025,6 +1042,95 @@ self._read_pos = 0 return self._read_buf[self._read_pos:] + def prefetch(self, b, skip=0, minread=0): + """ Skip `skip` bytes from the stream. Try to read at least `minread` + bytes and write them into buffer. The read position is advanced by at + most `skip + minread`, or less if the end of file was reached. The + total number of bytes written in `b` is returned, which can be more + than `minread` if additional bytes could be prefetched (but, of course, + cannot be more than `len(b)`). + """ + if skip < 0: + raise ValueError("skip must be positive or zero."); + if minread < 0: + raise ValueError("minread must be positive or zero."); + if minread > len(b): + msg = "minread must not be greater than size of buffer" + raise ValueError(msg) + with self._read_lock: + return self._prefetch_unlocked(b, skip, minread) + + def _prefetch_unlocked(self, b, skip=0, minread=0): + written = 0 + limit = len(b) + assert skip >= 0 and 0 <= minread <= limit + n = self._bytes_available() + if n > 0: + # try to work within the current buffer + if n >= skip: + n -= skip + written = min(n, limit) + offset = self._read_pos + skip + b[:written] = self._read_buf[offset:offset + written] + if n >= minread: + # Fast-path: current buffer has satisfied entire read + self._read_pos = offset + minread + return written + # skip offset was within buffer, minread was not + self._read_pos = offset + limit -= written + skip = 0 + else: + skip -= n + self._reset_read_buf() + if skip > 0: + # use seek to skip if available + if self.raw.seekable(): + self.raw.seek(skip, SEEK_CUR) + else: + # seek by reading buffer_size chunks at a time as + # skip may be large + while True: + try: + data = self.raw.read(self.buffer_size) + except IOError as e: + if e.errno != EINTR: + raise + continue + if not data: + return written + n = len(data) + if n >= skip: + # save data after skip in buffer for copying below + self._read_buf = data[skip:] + break + skip -= n + while True: + if self._bytes_available(): + data = self._read_buf[self._read_pos:] + else: + try: + data = self.raw.read(max(self.buffer_size, limit)) + except IOError as e: + if e.errno != EINTR: + raise + continue + if not data: + break + b[written:] = data[:limit] + n = len(data) + if n >= limit: + written += limit + offset = limit - (written - minread) + self._read_buf = data[self._read_pos + offset:] + self._read_pos = 0 + break + written += n + limit -= n + if written >= minread: + break + return written + def read1(self, n): """Reads up to n bytes, with at most one read() system call.""" # Returns up to n bytes. If at least one byte is buffered, we @@ -1195,6 +1301,9 @@ def readinto(self, b): return self.reader.readinto(b) + def prefetch(self, b, skip=0, minread=0): + return self.reader.prefetch(b, skip, minread) + def write(self, b): return self.writer.write(b) @@ -1281,6 +1390,10 @@ self.flush() return BufferedReader.readinto(self, b) + def prefetch(self, b, skip=0, minread=0): + self.flush() + return BufferedReader.prefetch(self, b, skip, minread) + def peek(self, n=0): self.flush() return BufferedReader.peek(self, n) diff -r 90a06fbb1f85 Lib/test/test_io.py --- a/Lib/test/test_io.py Sat Oct 01 19:22:30 2011 +0200 +++ b/Lib/test/test_io.py Mon Oct 03 20:51:03 2011 -0400 @@ -829,6 +829,85 @@ self.assertEqual(bufio.readinto(b), 1) self.assertEqual(b, b"cb") + def test_prefetch(self): + rawio = self.MockRawIO((b'abc', b'd', b'ef', None)) + bufio = self.tp(rawio) + b = bytearray(2) + # Invalid args + self.assertRaises(TypeError, bufio.prefetch, None, 0, 0) + self.assertRaises(TypeError, bufio.prefetch, 0, None, 0) + self.assertRaises(TypeError, bufio.prefetch, 0, 0, None) + self.assertRaises(ValueError, bufio.prefetch, b, 0, 4) + self.assertRaises(ValueError, bufio.prefetch, b, -1, 0) + self.assertRaises(ValueError, bufio.prefetch, b, 0, -1) + + self.assertEqual(bufio.prefetch(b), 2) + self.assertEqual(b, b'ab') + self.assertEqual(bufio.prefetch(b, 0, 0), 2) + self.assertEqual(b, b'ab') + self.assertEqual(bufio.prefetch(b, 0, 2), 2) + self.assertEqual(b, b'ab') + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.prefetch(b, 1, 2), 2) + self.assertEqual(b, b'de') + self.assertEqual(rawio._reads, 3) + self.assertEqual(bufio.prefetch(b, 0, 2), 1) + self.assertEqual(b, b'fe') + self.assertEqual(rawio._reads, 4) + self.assertEqual(bufio.prefetch(b, 0, 1), 0) + self.assertEqual(b, b'fe') + self.assertEqual(rawio._reads, 5) + # EOF during skip + b = bytearray(2) + bufio.seek(0) + self.assertEqual(bufio.prefetch(b, 8, 2), 0) + self.assertEqual(b, b'\x00\x00') + + def test_prefetch_nonblocking(self): + rawio = self.MockRawIO((b'abc', None, b'd', None, b'ef', None)) + bufio = self.tp(rawio) + b = bytearray(2) + self.assertEqual(bufio.prefetch(b, 4), 0) + self.assertEqual(b, b'\x00\x00') + self.assertEqual(bufio.prefetch(b, 0, 1), 1) + self.assertEqual(b, b'd\x00') + self.assertEqual(bufio.prefetch(b, 0, 0), 0) + self.assertEqual(b, b'\x00\x00') + self.assertEqual(bufio.prefetch(b, 0, 2), 2) + self.assertEqual(b, b'ef') + self.assertEqual(bufio.prefetch(b, 0, 1), 0) + self.assertEqual(b, b'\x00\x00') + + + def test_unseekable_prefetch(self): + rawio = self.MockUnseekableIO(b'abc' * 10) + bufio = self.tp(rawio) + b = bytearray(3) + tests = (((0, 0), 3, b'abc'), + ((1, 2), 3, b'bca'), + ((24, 0), 3, b'abc'), + ((2, 3), 1, b'cbc')) + for args, n, buf in tests: + self.assertEqual(bufio.prefetch(b, *args), n) + self.assertEqual(b, buf) + + # large skips > bufsize + bufsize = 6 + data = b'abcxyz' * bufsize * 10 + bufio = self.tp(self.MockUnseekableIO(data), bufsize) + b = bytearray(bufsize) + self.assertEqual(bufio.prefetch(b, len(data) - len(b), 1), len(b)) + self.assertEqual(b, b'abcxyz') + self.assertEqual(bufio.prefetch(b, 2, 6), 3) + self.assertEqual(b, b'xyzxyz') + # EOF during skip + b = bytearray(2) + rawio = self.MockUnseekableIO(b'abc') + bufio = self.tp(rawio) + self.assertEqual(bufio.prefetch(b, 8, 2), 0) + self.assertEqual(b, b'\x00\x00') + + def test_readlines(self): def bufio(): rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) @@ -1216,7 +1295,6 @@ DeprecationWarning)): self.tp(self.MockRawIO(), 8, 12) - class CBufferedWriterTest(BufferedWriterTest): tp = io.BufferedWriter @@ -1607,6 +1685,7 @@ # You can't construct a BufferedRandom over a non-seekable stream. test_unseekable = None + test_unseekable_prefetch = None class CBufferedRandomTest(BufferedRandomTest): tp = io.BufferedRandom diff -r 90a06fbb1f85 Modules/_io/bufferedio.c --- a/Modules/_io/bufferedio.c Sat Oct 01 19:22:30 2011 +0200 +++ b/Modules/_io/bufferedio.c Mon Oct 03 20:51:03 2011 -0400 @@ -137,9 +137,25 @@ return bufferediobase_unsupported("write"); } +PyDoc_STRVAR(bufferediobase_prefetch_doc, + "Skip `skip` bytes from the stream. \n" + "Try to read at least `minread` bytes and write them into buffer.\n" + "The read position is advanced by at most `skip + minread`,\n" + "or less if the end of file was reached.\n" + "\n" + "The total number of bytes written in `b` is returned, which can be more\n" + "than `minread` if additional bytes could be prefetched (but, of course,\n" + "cannot be more than `len(b)`).\n"); + +static PyObject * +bufferediobase_prefetch(PyObject *self, PyObject *args) +{ + return bufferediobase_unsupported("prefetch"); +} static PyMethodDef bufferediobase_methods[] = { {"detach", (PyCFunction)bufferediobase_detach, METH_NOARGS, bufferediobase_detach_doc}, + {"prefetch", bufferediobase_prefetch, METH_VARARGS, bufferediobase_prefetch_doc}, {"read", bufferediobase_read, METH_VARARGS, bufferediobase_read_doc}, {"read1", bufferediobase_read1, METH_VARARGS, bufferediobase_read1_doc}, {"readinto", bufferediobase_readinto, METH_VARARGS, NULL}, @@ -598,7 +614,9 @@ _bufferedreader_read_generic(buffered *self, Py_ssize_t); static Py_ssize_t _bufferedreader_raw_read(buffered *self, char *start, Py_ssize_t len); - +static Py_ssize_t +_bufferedreader_prefetch(buffered *self, char *buf, Py_ssize_t limit, + Py_off_t skip, Py_ssize_t minread); /* * Helpers */ @@ -818,6 +836,57 @@ } static PyObject * +buffered_prefetch(buffered *self, PyObject *args) +{ + PyObject *skipobj = NULL; + Py_buffer buf; + Py_off_t skip = 0; + Py_ssize_t minread = 0, n; + + CHECK_INITIALIZED(self); + CHECK_CLOSED(self, "prefetch of closed file"); + + if (!PyArg_ParseTuple(args, "|w*On:prefetch", &buf, &skipobj, &minread)) { + return NULL; + } + if (buf.len == 0) { + PyErr_SetString(PyExc_ValueError, + "buffer length must be greater than zero."); + goto end; + } + if (minread < 0) { + PyErr_SetString(PyExc_ValueError, + "minread must be positive or zero."); + goto end; + } + if (minread > buf.len) { + PyErr_SetString(PyExc_ValueError, + "minread must not be greater than buffer size."); + goto end; + } + if (skipobj != NULL) { + skip = PyNumber_AsOff_t(skipobj, PyExc_ValueError); + if (skip < 0) { + if (!PyErr_Occurred()) { + PyErr_SetString(PyExc_ValueError, + "skip must be greater than or equal to 0."); + } + goto end; + } + } + n = _bufferedreader_prefetch(self, buf.buf, buf.len, skip, minread); + if (n == -2) + Py_RETURN_NONE; + if (n < 0) + return NULL; + + return PyLong_FromSsize_t(n); +end: + PyBuffer_Release(&buf); + return NULL; +} + +static PyObject * buffered_read(buffered *self, PyObject *args) { Py_ssize_t n = -1; @@ -922,11 +991,152 @@ return res; } +/* helper for prefetch() to support unseekable streams */ +static Py_off_t +_prefetch_safe_seek_unlocked(buffered *self, Py_off_t skip) +{ + Py_off_t res; + Py_ssize_t n; + + assert(Py_SAFE_DOWNCAST(READAHEAD(self), Py_off_t, Py_ssize_t) == 0); + + res = _buffered_raw_seek(self, skip, 1); + if (!(res == -1 && PyErr_Occurred() && + PyErr_ExceptionMatches(IO_STATE->unsupported_operation))) + return res; + + PyErr_Clear(); + /* Catch unsupported seek operation then just throw away the reads + * + * XXX: it would be nice if we didnt waste time under the hood with + * the memory copies. Is there a better way to do this? + */ + for (;;) { + n = _bufferedreader_fill_buffer(self); + if (n <= 0) + return n; + if (skip < n) + break; + _bufferedreader_reset_buf(self); + skip -= n; + } + self->pos = skip; + return 1; +} + +static Py_ssize_t +_bufferedreader_prefetch(buffered *self, + char *buf, Py_ssize_t limit, + Py_off_t skip, Py_ssize_t minread) +{ + Py_ssize_t written, n; + + assert(limit > 0); + assert(minread <= limit); + + if (skip < 0) + skip = 0; + + if (minread < 0) + minread = limit; + + /* fast-path: buffer satisfies skip + minread */ + n = Py_SAFE_DOWNCAST(READAHEAD(self), Py_off_t, Py_ssize_t); + if (n > skip + minread) { + written = Py_MIN(n - skip, limit); + self->pos += skip; + memcpy(buf, self->buffer + self->pos, written); + self->pos += minread; + return written; + } + + if (!ENTER_BUFFERED(self)) + return -1; + + if (self->writable) { + PyObject *res = buffered_flush_and_rewind_unlocked(self); + if (res == NULL) { + LEAVE_BUFFERED(self); + return -1; + } + Py_CLEAR(res); + } + + written = 0; + if (skip != n) { + if (skip < n) { + /* buffer satisfies skip */ + self->pos += skip; + n -= skip; + } + else { + /* skip is more than current buffer, throw away buffer + * and seek the raw stream */ + _bufferedreader_reset_buf(self); + self->pos = 0; + skip = _prefetch_safe_seek_unlocked(self, skip - n); + if (skip <= 0) { + LEAVE_BUFFERED(self); + return (skip == -2) ? 0 : skip; + } + n = Py_SAFE_DOWNCAST(READAHEAD(self), Py_off_t, Py_ssize_t); + } + /* drain buffer after skip and/or when minread is large */ + if (n > 0) { + written = Py_MIN(n, limit); + memcpy(buf, self->buffer + self->pos, written); + if (written >= minread) { + self->pos += minread; + LEAVE_BUFFERED(self); + return written; + } + self->pos += written; + limit -= written; + /* should be covered by the minread <= limit check */ + assert(limit > 0); + } + } + + _bufferedreader_reset_buf(self); + self->pos = 0; + /* try to copy in full passes */ + for ( ; written < minread || written == 0; + written += n, limit -= n) { + /* If remaining bytes is more than internal buffer size, copy + * directly into caller's buffer. Care is taken to support unseekable + * streams by only raw reading as far as we planned on seeking anyway + * ie. bounded by `minread` + */ + Py_ssize_t to_seek = Py_MAX(0, minread - written); + if (to_seek >= self->buffer_size) { + n = _bufferedreader_raw_read(self, (char *) buf + written, to_seek); + } + else { + n = _bufferedreader_fill_buffer(self); + if (n > 0) { + if (n > limit) + n = limit; + memcpy((char *) buf + written, self->buffer + self->pos, n); + self->pos += Py_MIN(n, to_seek); + continue; /* short circuit */ + } + } + if (n == 0 || (n == -2 && written > 0)) + break; + if (n < 0) { + LEAVE_BUFFERED(self); + return n; + } + } + LEAVE_BUFFERED(self); + return written; +} + static PyObject * buffered_readinto(buffered *self, PyObject *args) { Py_buffer buf; - Py_ssize_t n, written = 0, remaining; + Py_ssize_t n; PyObject *res = NULL; CHECK_INITIALIZED(self) @@ -935,66 +1145,22 @@ return NULL; n = Py_SAFE_DOWNCAST(READAHEAD(self), Py_off_t, Py_ssize_t); - if (n > 0) { - if (n >= buf.len) { - memcpy(buf.buf, self->buffer + self->pos, buf.len); - self->pos += buf.len; - res = PyLong_FromSsize_t(buf.len); - goto end_unlocked; - } - memcpy(buf.buf, self->buffer + self->pos, n); - self->pos += n; - written = n; + if (n >= buf.len) { + memcpy(buf.buf, self->buffer + self->pos, buf.len); + self->pos += buf.len; + res = PyLong_FromSsize_t(buf.len); + goto end; } - if (!ENTER_BUFFERED(self)) - goto end_unlocked; - - if (self->writable) { - res = buffered_flush_and_rewind_unlocked(self); - if (res == NULL) - goto end; - Py_CLEAR(res); + n = _bufferedreader_prefetch(self, buf.buf, buf.len, 0, -1); + if (n == -2) { + Py_INCREF(Py_None); + res = Py_None; } - - _bufferedreader_reset_buf(self); - self->pos = 0; - - for (remaining = buf.len - written; - remaining > 0; - written += n, remaining -= n) { - /* If remaining bytes is larger than internal buffer size, copy - * directly into caller's buffer. */ - if (remaining > self->buffer_size) { - n = _bufferedreader_raw_read(self, (char *) buf.buf + written, - remaining); - } - else { - n = _bufferedreader_fill_buffer(self); - if (n > 0) { - if (n > remaining) - n = remaining; - memcpy((char *) buf.buf + written, - self->buffer + self->pos, n); - self->pos += n; - continue; /* short circuit */ - } - } - if (n == 0 || (n == -2 && written > 0)) - break; - if (n < 0) { - if (n == -2) { - Py_INCREF(Py_None); - res = Py_None; - } - goto end; - } - } - res = PyLong_FromSsize_t(written); + else if (n >= 0) + res = PyLong_FromSsize_t(n); end: - LEAVE_BUFFERED(self); -end_unlocked: PyBuffer_Release(&buf); return res; } @@ -1665,6 +1831,7 @@ {"read", (PyCFunction)buffered_read, METH_VARARGS}, {"peek", (PyCFunction)buffered_peek, METH_VARARGS}, + {"prefetch", (PyCFunction)buffered_prefetch, METH_VARARGS}, {"read1", (PyCFunction)buffered_read1, METH_VARARGS}, {"readinto", (PyCFunction)buffered_readinto, METH_VARARGS}, {"readline", (PyCFunction)buffered_readline, METH_VARARGS}, @@ -2241,6 +2408,12 @@ } static PyObject * +bufferedrwpair_prefetch(rwpair *self, PyObject *args) +{ + return _forward_call(self->reader, "prefetch", args); +} + +static PyObject * bufferedrwpair_read1(rwpair *self, PyObject *args) { return _forward_call(self->reader, "read1", args); @@ -2310,6 +2483,7 @@ static PyMethodDef bufferedrwpair_methods[] = { {"read", (PyCFunction)bufferedrwpair_read, METH_VARARGS}, {"peek", (PyCFunction)bufferedrwpair_peek, METH_VARARGS}, + {"prefetch", (PyCFunction)bufferedrwpair_prefetch, METH_VARARGS}, {"read1", (PyCFunction)bufferedrwpair_read1, METH_VARARGS}, {"readinto", (PyCFunction)bufferedrwpair_readinto, METH_VARARGS}, @@ -2456,6 +2630,7 @@ {"readinto", (PyCFunction)buffered_readinto, METH_VARARGS}, {"readline", (PyCFunction)buffered_readline, METH_VARARGS}, {"peek", (PyCFunction)buffered_peek, METH_VARARGS}, + {"prefetch", (PyCFunction)buffered_prefetch, METH_VARARGS}, {"write", (PyCFunction)bufferedwriter_write, METH_VARARGS}, {NULL, NULL} };