Index: Lib/io.py =================================================================== --- Lib/io.py (revision 69165) +++ Lib/io.py (working copy) @@ -1027,10 +1027,16 @@ buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None): raw._checkWritable() _BufferedIOMixin.__init__(self, raw) + if buffer_size < 0: + raise ValueError("negative buffer size %d" % buffer_size) self.buffer_size = buffer_size - self.max_buffer_size = (2*buffer_size - if max_buffer_size is None - else max_buffer_size) + if max_buffer_size is None: + self.max_buffer_size = 2*buffer_size + else: + if buffer_size > max_buffer_size: + raise ValueError("buffer_size %d > max_buffer_size %d" + % (buffer_size, max_buffer_size)) + self.max_buffer_size = max_buffer_size self._write_buf = bytearray() self._write_lock = Lock() @@ -1047,17 +1053,79 @@ self._flush_unlocked() except BlockingIOError as e: # We can't accept anything else. - # XXX Why not just let the exception pass through? + # Reraise this with 0 in the written field as none of the + # data passed to this call has been written. raise BlockingIOError(e.errno, e.strerror, 0) + before = len(self._write_buf) - self._write_buf.extend(b) - written = len(self._write_buf) - before + bytes_to_consume = self.max_buffer_size - before + + # Determine the size and shape of our data; use a memoryview to + # avoid extra data copies when possible (the common case). + data = b + try: + data = memoryview(data) + item_size = data.itemsize + data_length = len(data) * item_size + # Prefer to do IO in multiples of item_size bytes to avoid + # data copies regardless of the exact buffer size settings. + if (bytes_to_consume >= item_size and + self.max_buffer_size >= item_size): + items_to_consume = bytes_to_consume//item_size + items_per_max_buffer = self.max_buffer_size//item_size + else: + # This causes a full memory copy up front because the + # item size is larger than our buffer so we're forced to + # break items up into bytes. + data = memoryview(data.tostring()) + raise TypeError # Reuse code from the non-buffer case. + except TypeError: + # Input does not support the buffer API or had a large items. + data_length = len(data) + item_size = 1 + items_to_consume = bytes_to_consume + items_per_max_buffer = self.max_buffer_size + + if data_length > bytes_to_consume: + chunk = data[:items_to_consume] + # Loop over the data, flushing it to the underlying raw IO + # stream in items_per_max_buffer chunks. + items_written = 0 + self._write_buf.extend(chunk) + while chunk and len(self._write_buf) >= self.buffer_size: + try: + self._flush_unlocked() + except BlockingIOError as e: + remaining_bytes = (len(data)-items_written)*item_size + if (self.max_buffer_size - len(self._write_buf) >= + remaining_bytes): + # Enough room for the rest? buffer it. + self._write_buf.extend(data[items_written:]) + else: + # Exclude characters that were in the buffer before + # this call from being reported as written. + written = items_written * item_size + written += e.characters_written - before + if written < 0: + written = 0 + raise BlockingIOError(e.errno, e.strerror, written) + items_written += len(chunk) + before = 0 + assert not self._write_buf, "_write_buf should be empty" + chunk = data[items_written: + items_written + items_per_max_buffer] + self._write_buf.extend(chunk) + written = items_written*item_size + len(self._write_buf) + else: + # Simple case: writing less than the remaining buffer space. + self._write_buf.extend(data) + written = len(self._write_buf) - before if len(self._write_buf) > self.buffer_size: try: self._flush_unlocked() except BlockingIOError as e: if len(self._write_buf) > self.max_buffer_size: - # We've hit max_buffer_size. We have to accept a + # We have hit max_buffer_size. We must accept a # partial write and cut back our buffer. overage = len(self._write_buf) - self.max_buffer_size self._write_buf = self._write_buf[:self.max_buffer_size] Index: Lib/test/test_io.py =================================================================== --- Lib/test/test_io.py (revision 69165) +++ Lib/test/test_io.py (working copy) @@ -65,20 +65,29 @@ def __init__(self, blocking_script): self._blocking_script = list(blocking_script) - self._write_stack = [] + self._write_stack = [] # arg to write + self._written_stack = [] # data virtually written def write(self, b): - self._write_stack.append(b[:]) + self._write_stack.append(bytes(b[:])) n = self._blocking_script.pop(0) - if (n < 0): + if (n <= 0): + self._written_stack.append(bytes(b[:-n])) raise io.BlockingIOError(0, "test blocking", -n) else: + self._written_stack.append(bytes(b[:n])) return n def writable(self): return True + def __repr__(self): + return ("MockNonBlockWriterIO:\n" + " _write_stack = %r\n" + " _written_stack = %r\n" + % (self._write_stack, self._written_stack)) + class IOTest(unittest.TestCase): def setUp(self): @@ -490,6 +499,157 @@ self.assertEquals(b"abcdefghijkl", writer._write_stack[0]) + def testWriteMaxBufferSize(self): + writer = MockRawIO() + bufio = io.BufferedWriter(writer, buffer_size=8, max_buffer_size=13) + + self.assertEquals(10, bufio.write(b"abcdefghij")) + self.assertEquals(16, bufio.write(b"klmnopqrstuvwxyz")) + self.assertEquals(30, bufio.write(b"0123456789"*3)) + + expected_writes = [b"abcdefghij", b"klmnopqrstuvw", b"xyz0123456789", + b"0123456789012"] + self.assertEquals(expected_writes, writer._write_stack) + bufio.close() + self.assertEquals(b"3456789", writer._write_stack[4]) + + def testWriteMaxBufferSizeIsBufferSize(self): + writer = MockRawIO() + bufio = io.BufferedWriter(writer, buffer_size=8, max_buffer_size=8) + + self.assertEquals(10, bufio.write(b"abcdefghij")) + self.assertEquals(16, bufio.write(b"klmnopqrstuvwxyz")) + self.assertEquals(30, bufio.write(b"0123456789"*3)) + + expected_writes = [b"abcdefgh", b"ijklmnop", b"qrstuvwx", + b"yz012345", b"67890123", b"45678901", + b"23456789"] + self.assertEquals(expected_writes, writer._write_stack) + bufio.close() + self.assertEquals(b"23456789", writer._write_stack[6]) + + def testInvalidBufferSizes(self): + writer = MockRawIO() + self.assertRaises(ValueError, io.BufferedWriter, writer, buffer_size=-1) + self.assertRaises(ValueError, io.BufferedWriter, writer, buffer_size=2, + max_buffer_size=1) + + def testWriteArrays(self): + writer = MockRawIO() + buf_size = 8 + max_buf_size = 13 + bufio = io.BufferedWriter(writer, buffer_size=buf_size, + max_buffer_size=max_buf_size) + + small_array = array.array('i', [0xff]) + small_bytes = small_array.tostring() + buffer_array = array.array('i', (1,2)) + buffer_bytes = buffer_array.tostring() + large_array = array.array('i', range(3,10)) + large_bytes = large_array.tostring() + + self.assertEquals(len(buffer_bytes), bufio.write(buffer_array)) + self.assertEquals(len(small_bytes), bufio.write(small_array)) + self.assertEquals(len(small_bytes), bufio.write(small_array)) + self.assertEquals(len(buffer_bytes), bufio.write(buffer_array)) + self.assertEquals(len(large_bytes), bufio.write(large_array)) + + first_two_writes = [buffer_bytes + small_bytes, + small_bytes + buffer_bytes] + self.assertEquals(first_two_writes, writer._write_stack[:2]) + self.assert_(buf_size <= len(writer._write_stack[2]) <= max_buf_size) + self.assert_(buf_size <= len(writer._write_stack[3]) <= max_buf_size) + bufio.close() + self.assertEquals(large_bytes, b''.join(writer._write_stack[2:])) + + def testWriteNoLengthIterableFails(self): + writer = MockRawIO() + bufio = io.BufferedWriter(writer, buffer_size=8) + + def Generator(value): + for x in value: + yield x + iterable = Generator(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ') + self.assertRaises(TypeError, len, iterable) + # write() does not accept things that do not support len() + self.assertRaises(TypeError, bufio.write, iterable) + + + # The testBlockingIOErrorCharactersWritten* tests check that all + # BlockingIOError exceptions that come out of the write() method report + # characters_written values properly for the arg passed in, and not for + # any characters already in the internal buffer that were written. + + def testBlockingIOErrorCharactersWrittenPrebufTwoBuf(self): + # Data in the buffer, asked to write enough for two buffers of data. + raw = MockNonBlockWriterIO((-7,)) + bufio = io.BufferedWriter(raw, 8, max_buffer_size=16) + bufio.write(b"abcd") + try: + bufio.write(b"abcdefghijklm") + except io.BlockingIOError as e: + # It should return only the number of characters from the supplied + # data that were written. + self.assertEquals(7-4, e.characters_written) + else: + self.fail("expected a BlockingIOError. %r" % raw) + + def testBlockingIOErrorCharactersWrittenPrebufManyBuf(self): + # Data in the buffer, asked to write enough for several buffers of data. + raw = MockNonBlockWriterIO((16,16,-3)) + bufio = io.BufferedWriter(raw, 8, max_buffer_size=16) + testdata = b"ZZZZ" + (b"abcdefghijklm"*4) + bufio.write(testdata[:4]) + try: + bufio.write(testdata[4:]) + except io.BlockingIOError as e: + expected_written = testdata[:e.characters_written+4] + self.assertEquals(expected_written, b''.join(raw._written_stack)) + else: + self.fail("expected a BlockingIOError. %r" % raw) + + def testBlockingIOErrorCharactersWrittenPrebufManyFlushZero(self): + # Data in the buffer, asked to write enough for many flushes. + raw = MockNonBlockWriterIO((0,0,0)) + bufio = io.BufferedWriter(raw, 8, max_buffer_size=16) + bufio.write(b"defgh") # buffered + bufio.write(b"ijklm") # buffered reluctantly + try: + bufio.write(b"ijklm") # rejected, already full + except io.BlockingIOError as e: + self.assertEquals(0, e.characters_written) + else: + self.fail("expected a BlockingIOError. %r" % raw) + + def testBlockingIOErrorCharactersWrittenLargeFlushFail(self): + # Data in the buffer, asked to write enough for many flushes. + raw = MockNonBlockWriterIO((16,0)) + bufio = io.BufferedWriter(raw, 8, max_buffer_size=16) + try: + bufio.write(b"abcdefghijklmnopqrstuvwxyz"*2) + except io.BlockingIOError as e: + # At least 16 were written, some more may or may not have been + # buffered but no more than max_buffer_size. + self.assert_(16 <= e.characters_written <= 16+16, + "16 <= chars written %d < 32" % e.characters_written) + else: + self.fail("expected a BlockingIOError. %r" % raw) + + def testBlockingIOErrorCharactersWrittenPrebufTwoBufZero(self): + # Data in the buffer, asked to write just more than max buffer data. + raw = MockNonBlockWriterIO((-2,)) + bufio = io.BufferedWriter(raw, 8, max_buffer_size=16) + bufio.write(b"Z"*4) + try: + bufio.write(b"x"*13) + except io.BlockingIOError as e: + # We only let 2 characters be written. 4 were in the buffer. + # None of the data from this call should be reported as written. + self.assertEquals(0, e.characters_written) + else: + self.fail("expected a BlockingIOError. %r" % raw) + + def testWriteNonBlocking(self): raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12)) bufio = io.BufferedWriter(raw, 8, 16)