# HG changeset patch # Parent 5a0839d412b9c4b653b7120938fe62621657e3d2 Fix for BufferedWriter.write() with non-blocking file (Issue 13322) This patch ensures that BlockingIOError is raised when the wrapped raw file is non-blocking and the write would block. The characters_written attribute of the exception set appropriately. Previously IOError was raised with no indication of the number of bytes consumed from the argument to write(). This was because the code expected self.raw.write() to raise BlockingIOError if the write would block, when in fact RawIOBase.write() should return None instead. (self.raw is required to follow the RawIOBase interface.) diff -r 5a0839d412b9 Lib/_pyio.py --- a/Lib/_pyio.py Wed Nov 02 15:09:37 2011 -0500 +++ b/Lib/_pyio.py Mon Nov 07 17:45:17 2011 +0000 @@ -6,6 +6,7 @@ import abc import codecs import warnings +import errno # Import _thread instead of threading to reduce startup cost try: from _thread import allocate_lock as Lock @@ -717,8 +718,11 @@ def close(self): if self.raw is not None and not self.closed: - self.flush() - self.raw.close() + try: + # may raise BlockingIOError or BrokenPipeError etc + self.flush() + finally: + self.raw.close() def detach(self): if self.raw is None: @@ -1077,13 +1081,9 @@ # XXX we can implement some more tricks to try and avoid # partial writes if len(self._write_buf) > self.buffer_size: - # We're full, so let's pre-flush the buffer - try: - self._flush_unlocked() - except BlockingIOError as e: - # We can't accept anything else. - # XXX Why not just let the exception pass through? - raise BlockingIOError(e.errno, e.strerror, 0) + # We're full, so let's pre-flush the buffer. (This may + # raise BlockingIOError with characters_written == 0.) + self._flush_unlocked() before = len(self._write_buf) self._write_buf.extend(b) written = len(self._write_buf) - before @@ -1114,22 +1114,21 @@ def _flush_unlocked(self): if self.closed: raise ValueError("flush of closed file") - written = 0 - try: - while self._write_buf: - try: - n = self.raw.write(self._write_buf) - except InterruptedError: - continue - if n > len(self._write_buf) or n < 0: - raise IOError("write() returned incorrect number of bytes") - del self._write_buf[:n] - written += n - except BlockingIOError as e: - n = e.characters_written + while self._write_buf: + try: + n = self.raw.write(self._write_buf) + except InterruptedError: + continue + except BlockingIOError: + raise RuntimeError("self.raw should implement RawIOBase: it " + "should not raise BlockingIOError") + if n is None: + raise BlockingIOError( + errno.EAGAIN, + "write could not complete without blocking", 0) + if n > len(self._write_buf) or n < 0: + raise IOError("write() returned incorrect number of bytes") del self._write_buf[:n] - written += n - raise BlockingIOError(e.errno, e.strerror, written) def tell(self): return _BufferedIOMixin.tell(self) + len(self._write_buf) diff -r 5a0839d412b9 Lib/test/test_io.py --- a/Lib/test/test_io.py Wed Nov 02 15:09:37 2011 -0500 +++ b/Lib/test/test_io.py Mon Nov 07 17:45:17 2011 +0000 @@ -42,7 +42,10 @@ import threading except ImportError: threading = None - +try: + import fcntl +except ImportError: + fcntl = None def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" @@ -242,9 +245,14 @@ except ValueError: pass else: - self._blocker_char = None - self._write_stack.append(b[:n]) - raise self.BlockingIOError(0, "test blocking", n) + if n > 0: + # write data up to the first blocker + self._write_stack.append(b[:n]) + return n + else: + # cancel blocker and indicate would block + self._blocker_char = None + return None self._write_stack.append(b) return len(b) @@ -1233,6 +1241,66 @@ DeprecationWarning)): self.tp(self.MockRawIO(), 8, 12) + @unittest.skipUnless(fcntl, 'fcntl required for this test') + def test_nonblock_pipe_write_bigbuf(self): + self._test_nonblock_pipe_write(16*1024) + + @unittest.skipUnless(fcntl, 'fcntl required for this test') + def test_nonblock_pipe_write_smallbuf(self): + self._test_nonblock_pipe_write(1024) + + def _set_non_blocking(self, fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + self.assertNotEqual(flags, -1) + res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + self.assertEqual(res, 0) + + def _test_nonblock_pipe_write(self, bufsize): + sent = [] + received = [] + r, w = os.pipe() + self._set_non_blocking(r) + self._set_non_blocking(w) + + # To exercise all code paths in the C implementation we need + # to play with buffer sizes. For instance, if we choose a + # buffer size less than or equal to _PIPE_BUF (4096 on Linux) + # then we will never get a partial write of the buffer. + rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) + wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) + + with rf, wf: + for N in 9999, 73, 7574: + try: + i = 0 + while True: + msg = bytes([i % 26 + 97]) * N + sent.append(msg) + wf.write(msg) + i += 1 + + except self.BlockingIOError as e: + self.assertEqual(e.args[2], e.characters_written) + sent[-1] = sent[-1][:e.characters_written] + received.append(rf.read()) + msg = b'BLOCKED' + wf.write(msg) + sent.append(msg) + + while True: + try: + wf.flush() + break + except BlockingIOError: + received.append(rf.read()) + + received += iter(rf.read, None) + + sent, received = b''.join(sent), b''.join(received) + self.assertTrue(sent == received) + self.assertTrue(wf.closed) + self.assertTrue(rf.closed) + class CBufferedWriterTest(BufferedWriterTest): tp = io.BufferedWriter diff -r 5a0839d412b9 Modules/_io/bufferedio.c --- a/Modules/_io/bufferedio.c Wed Nov 02 15:09:37 2011 -0500 +++ b/Modules/_io/bufferedio.c Mon Nov 07 17:45:17 2011 +0000 @@ -618,6 +618,18 @@ * Helpers */ +/* Sets the current error to BlockingIOError */ +static void +_set_BlockingIOError(char *msg, Py_ssize_t written) +{ + PyObject *err; + err = PyObject_CallFunction(PyExc_BlockingIOError, "isn", + EAGAIN, msg, written); + if (err) + PyErr_SetObject(PyExc_BlockingIOError, err); + Py_XDECREF(err); +} + /* Returns the address of the `written` member if a BlockingIOError was raised, NULL otherwise. The error is always re-raised. */ static Py_ssize_t * @@ -1841,6 +1853,11 @@ Py_DECREF(memobj); if (res == NULL) return -1; + if (res == Py_None) { + /* Non-blocking stream would have blocked. Special return code! */ + Py_DECREF(res); + return -2; + } n = PyNumber_AsSsize_t(res, PyExc_ValueError); Py_DECREF(res); if (n < 0 || n > len) { @@ -1879,14 +1896,15 @@ Py_SAFE_DOWNCAST(self->write_end - self->write_pos, Py_off_t, Py_ssize_t)); if (n == -1) { - Py_ssize_t *w = _buffered_check_blocking_error(); - if (w == NULL) - goto error; - self->write_pos += *w; - self->raw_pos = self->write_pos; - written += *w; - *w = written; - /* Already re-raised */ + goto error; + } + else if (n == -2) { + /* _bufferedwriter_raw_write() failed with EAGAIN and + nothing written */ + /* XXX Do we need to adjust any attributes of self + or cleanup by restoring position or resetting buffer? */ + _set_BlockingIOError("write could not complete without blocking", + 0); goto error; } self->write_pos += n; @@ -1984,14 +2002,21 @@ PyErr_Clear(); memcpy(self->buffer + self->write_end, buf.buf, buf.len); self->write_end += buf.len; + self->pos += buf.len; + /* XXX Do we need to adjust self->raw_pos or anything else? */ written = buf.len; goto end; } /* Buffer as much as possible. */ memcpy(self->buffer + self->write_end, buf.buf, avail); self->write_end += avail; - /* Already re-raised */ - *w = avail; + self->pos += avail; + /* XXX Do we need to adjust self->raw_pos or anything else? */ + /* XXX Modifying the existing exception e using the pointer w + will change e.characters_written but not e.args[2]. + Therefore we just replace with a new error. */ + _set_BlockingIOError("write could not complete without " + "blocking", avail); goto error; } Py_CLEAR(res); @@ -2016,11 +2041,10 @@ Py_ssize_t n = _bufferedwriter_raw_write( self, (char *) buf.buf + written, buf.len - written); if (n == -1) { - Py_ssize_t *w = _buffered_check_blocking_error(); - if (w == NULL) - goto error; - written += *w; - remaining -= *w; + goto error; + } else if (n == -2) { + /* _bufferedwriter_raw_write() failed with EAGAIN and + nothing written */ if (remaining > self->buffer_size) { /* Can't buffer everything, still buffer as much as possible */ memcpy(self->buffer, @@ -2028,8 +2052,10 @@ self->raw_pos = 0; ADJUST_POSITION(self, self->buffer_size); self->write_end = self->buffer_size; - *w = written + self->buffer_size; - /* Already re-raised */ + /* XXX Do we need to adjust any other attributes of self? */ + written += self->buffer_size; + _set_BlockingIOError("write could not complete without " + "blocking", written); goto error; } PyErr_Clear();