diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -1496,6 +1496,32 @@ class BufferedRandomTest(BufferedReaderT self.assertEqual(s, b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) + def test_write_rewind_write(self): + # Various combinations of reading / writing / seeking backwards / writing again + def mutate(bufio, pos1, pos2): + assert pos2 >= pos1 + # Fill the buffer + bufio.seek(pos1) + bufio.read(pos2 - pos1) + bufio.write(b'\x02') + # This writes earlier than the previous write, but still inside + # the buffer. + bufio.seek(pos1) + bufio.write(b'\x01') + + b = b"\x80\x81\x82\x83\x84" + for i in range(0, len(b)): + for j in range(i, len(b)): + raw = self.BytesIO(b) + bufio = self.tp(raw, 100) + mutate(bufio, i, j) + bufio.flush() + expected = bytearray(b) + expected[j] = 2 + expected[i] = 1 + self.assertEqual(raw.getvalue(), expected, + "failed result for i=%d, j=%d" % (i, j)) + def test_truncate_after_read_or_write(self): raw = self.BytesIO(b"A" * 10) bufio = self.tp(raw, 100) diff --git a/Modules/_io/bufferedio.c b/Modules/_io/bufferedio.c --- a/Modules/_io/bufferedio.c +++ b/Modules/_io/bufferedio.c @@ -1894,7 +1894,7 @@ bufferedwriter_write(buffered *self, PyO avail = Py_SAFE_DOWNCAST(self->buffer_size - self->pos, Py_off_t, Py_ssize_t); if (buf.len <= avail) { memcpy(self->buffer + self->pos, buf.buf, buf.len); - if (!VALID_WRITE_BUFFER(self)) { + if (!VALID_WRITE_BUFFER(self) || self->write_pos > self->pos) { self->write_pos = self->pos; } ADJUST_POSITION(self, self->pos + buf.len);