# HG changeset patch # Parent 7185a35fb293ef947a27da2e5b114ebcfa97e5ba Use max_length in LZMAFile decompression * Split out _RawReader and wrap it with BufferedReader to provide the read mode APIs of LZMAFile * The specification of the peek() method is vague * read() now accepts size=None, because BufferedReader does * BufferedReader.seek() raises a different exception for invalid “whence” * Work around different signature for BufferedReader.read1() diff -r 7185a35fb293 Lib/lzma.py --- a/Lib/lzma.py Thu Feb 26 10:39:31 2015 +0100 +++ b/Lib/lzma.py Thu Feb 26 11:02:13 2015 +0000 @@ -29,13 +29,33 @@ _MODE_CLOSED = 0 _MODE_READ = 1 -_MODE_READ_EOF = 2 -_MODE_WRITE = 3 +_MODE_WRITE = 2 -_BUFFER_SIZE = 8192 +_BUFFER_SIZE = 8192 # Compressed data read chunk size -class LZMAFile(io.BufferedIOBase): +class _BaseStream(io.IOBase): + """Mode-checking helper functions.""" + + def _check_not_closed(self): + if self.closed: + raise ValueError("I/O operation on closed file") + + def _check_can_read(self): + if not self.readable(): + raise io.UnsupportedOperation("File not open for reading") + + def _check_can_write(self): + if not self.writable(): + raise io.UnsupportedOperation("File not open for writing") + + def _check_can_seek(self): + if not self.seekable(): + raise io.UnsupportedOperation("The underlying file object " + "does not support seeking") + + +class LZMAFile(_BaseStream, io.BufferedIOBase): """A file object providing transparent LZMA (de)compression. @@ -92,8 +112,6 @@ self._fp = None self._closefp = False self._mode = _MODE_CLOSED - self._pos = 0 - self._size = -1 if mode in ("r", "rb"): if check != -1: @@ -105,19 +123,13 @@ if format is None: format = FORMAT_AUTO mode_code = _MODE_READ - # Save the args to pass to the LZMADecompressor initializer. - # If the file contains multiple compressed streams, each - # stream will need a separate decompressor object. - self._init_args = {"format":format, "filters":filters} - self._decompressor = LZMADecompressor(**self._init_args) - self._buffer = b"" - self._buffer_offset = 0 elif mode in ("w", "wb", "a", "ab", "x", "xb"): if format is None: format = FORMAT_XZ mode_code = _MODE_WRITE self._compressor = LZMACompressor(format=format, check=check, preset=preset, filters=filters) + self._pos = 0 else: raise ValueError("Invalid mode: {!r}".format(mode)) @@ -133,6 +145,10 @@ else: raise TypeError("filename must be a str or bytes object, or a file") + if self._mode == _MODE_READ: + raw = _DecompressReader(self._fp, format=format, filters=filters) + self._buffer = io.BufferedReader(raw) + def close(self): """Flush and close the file. @@ -142,9 +158,8 @@ if self._mode == _MODE_CLOSED: return try: - if self._mode in (_MODE_READ, _MODE_READ_EOF): - self._decompressor = None - self._buffer = b"" + if self._mode == _MODE_READ: + self._buffer.close() elif self._mode == _MODE_WRITE: self._fp.write(self._compressor.flush()) self._compressor = None @@ -169,123 +184,18 @@ def seekable(self): """Return whether the file supports seeking.""" - return self.readable() and self._fp.seekable() + return self.readable() and self._buffer.seekable() def readable(self): """Return whether the file was opened for reading.""" self._check_not_closed() - return self._mode in (_MODE_READ, _MODE_READ_EOF) + return self._mode == _MODE_READ def writable(self): """Return whether the file was opened for writing.""" self._check_not_closed() return self._mode == _MODE_WRITE - # Mode-checking helper functions. - - def _check_not_closed(self): - if self.closed: - raise ValueError("I/O operation on closed file") - - def _check_can_read(self): - if self._mode not in (_MODE_READ, _MODE_READ_EOF): - self._check_not_closed() - raise io.UnsupportedOperation("File not open for reading") - - def _check_can_write(self): - if self._mode != _MODE_WRITE: - self._check_not_closed() - raise io.UnsupportedOperation("File not open for writing") - - def _check_can_seek(self): - if self._mode not in (_MODE_READ, _MODE_READ_EOF): - self._check_not_closed() - raise io.UnsupportedOperation("Seeking is only supported " - "on files open for reading") - if not self._fp.seekable(): - raise io.UnsupportedOperation("The underlying file object " - "does not support seeking") - - # Fill the readahead buffer if it is empty. Returns False on EOF. - def _fill_buffer(self): - if self._mode == _MODE_READ_EOF: - return False - # Depending on the input data, our call to the decompressor may not - # return any data. In this case, try again after reading another block. - while self._buffer_offset == len(self._buffer): - rawblock = (self._decompressor.unused_data or - self._fp.read(_BUFFER_SIZE)) - - if not rawblock: - if self._decompressor.eof: - self._mode = _MODE_READ_EOF - self._size = self._pos - return False - else: - raise EOFError("Compressed file ended before the " - "end-of-stream marker was reached") - - if self._decompressor.eof: - # Continue to next stream. - self._decompressor = LZMADecompressor(**self._init_args) - try: - self._buffer = self._decompressor.decompress(rawblock) - except LZMAError: - # Trailing data isn't a valid compressed stream; ignore it. - self._mode = _MODE_READ_EOF - self._size = self._pos - return False - else: - self._buffer = self._decompressor.decompress(rawblock) - self._buffer_offset = 0 - return True - - # Read data until EOF. - # If return_data is false, consume the data without returning it. - def _read_all(self, return_data=True): - # The loop assumes that _buffer_offset is 0. Ensure that this is true. - self._buffer = self._buffer[self._buffer_offset:] - self._buffer_offset = 0 - - blocks = [] - while self._fill_buffer(): - if return_data: - blocks.append(self._buffer) - self._pos += len(self._buffer) - self._buffer = b"" - if return_data: - return b"".join(blocks) - - # Read a block of up to n bytes. - # If return_data is false, consume the data without returning it. - def _read_block(self, n, return_data=True): - # If we have enough data buffered, return immediately. - end = self._buffer_offset + n - if end <= len(self._buffer): - data = self._buffer[self._buffer_offset : end] - self._buffer_offset = end - self._pos += len(data) - return data if return_data else None - - # The loop assumes that _buffer_offset is 0. Ensure that this is true. - self._buffer = self._buffer[self._buffer_offset:] - self._buffer_offset = 0 - - blocks = [] - while n > 0 and self._fill_buffer(): - if n < len(self._buffer): - data = self._buffer[:n] - self._buffer_offset = n - else: - data = self._buffer - self._buffer = b"" - if return_data: - blocks.append(data) - self._pos += len(data) - n -= len(data) - if return_data: - return b"".join(blocks) - def peek(self, size=-1): """Return buffered data without advancing the file position. @@ -293,9 +203,7 @@ The exact number of bytes returned is unspecified. """ self._check_can_read() - if not self._fill_buffer(): - return b"" - return self._buffer[self._buffer_offset:] + return self._buffer.peek(size) def read(self, size=-1): """Read up to size uncompressed bytes from the file. @@ -304,38 +212,19 @@ Returns b"" if the file is already at EOF. """ self._check_can_read() - if size == 0: - return b"" - elif size < 0: - return self._read_all() - else: - return self._read_block(size) + return self._buffer.read(size) def read1(self, size=-1): """Read up to size uncompressed bytes, while trying to avoid making multiple reads from the underlying stream. + Reads a buffer's worth of data if size is negative. Returns b"" if the file is at EOF. """ - # Usually, read1() calls _fp.read() at most once. However, sometimes - # this does not give enough data for the decompressor to make progress. - # In this case we make multiple reads, to avoid returning b"". self._check_can_read() - if (size == 0 or - # Only call _fill_buffer() if the buffer is actually empty. - # This gives a significant speedup if *size* is small. - (self._buffer_offset == len(self._buffer) and not self._fill_buffer())): - return b"" - if size > 0: - data = self._buffer[self._buffer_offset : - self._buffer_offset + size] - self._buffer_offset += len(data) - else: - data = self._buffer[self._buffer_offset:] - self._buffer = b"" - self._buffer_offset = 0 - self._pos += len(data) - return data + if size < 0: + size = io.DEFAULT_BUFFER_SIZE + return self._buffer.read1(size) def readline(self, size=-1): """Read a line of uncompressed bytes from the file. @@ -345,15 +234,7 @@ case the line may be incomplete). Returns b'' if already at EOF. """ self._check_can_read() - # Shortcut for the common case - the whole line is in the buffer. - if size < 0: - end = self._buffer.find(b"\n", self._buffer_offset) + 1 - if end > 0: - line = self._buffer[self._buffer_offset : end] - self._buffer_offset = end - self._pos += len(line) - return line - return io.BufferedIOBase.readline(self, size) + return self._buffer.readline(size) def write(self, data): """Write a bytes object to the file. @@ -368,15 +249,6 @@ self._pos += len(data) return len(data) - # Rewind the file to the beginning of the data stream. - def _rewind(self): - self._fp.seek(0, 0) - self._mode = _MODE_READ - self._pos = 0 - self._decompressor = LZMADecompressor(**self._init_args) - self._buffer = b"" - self._buffer_offset = 0 - def seek(self, offset, whence=0): """Change the file position. @@ -389,20 +261,110 @@ Returns the new file position. - Note that seeking is emulated, sp depending on the parameters, + Note that seeking is emulated, so depending on the parameters, this operation may be extremely slow. """ + if self._mode != _MODE_READ: + self._check_not_closed() + raise io.UnsupportedOperation("Seeking is only supported " + "on files open for reading") + return self._buffer.seek(offset, whence) + + def tell(self): + """Return the current file position.""" + if self._mode == _MODE_READ: + return self._buffer.tell() + self._check_not_closed() + return self._pos + + +class _DecompressReader(_BaseStream, io.RawIOBase): + def readable(self): + return True + + def __init__(self, fp, **init_args): + self._fp = fp + self._eof = False + self._pos = 0 + self._size = -1 + # Save the args to pass to the LZMADecompressor initializer. + # If the file contains multiple compressed streams, each + # stream will need a separate decompressor object. + self._init_args = init_args + self._decompressor = LZMADecompressor(**self._init_args) + + def close(self): + self._decompressor = None + return super().close() + + def seekable(self): + return self._fp.seekable() + + def readinto(self, b): + data = self.read(len(b)) + with memoryview(b) as view: + view.cast("B")[:len(data)] = data + return len(data) + + def read(self, size=-1): + if size < 0: + return self.readall() + + if not size or self._eof: + return b"" + # Depending on the input data, our call to the decompressor may not + # return any data. In this case, try again after reading another block. + while True: + if self._decompressor.eof: + rawblock = (self._decompressor.unused_data or + self._fp.read(_BUFFER_SIZE)) + if not rawblock: + self._eof = True + self._size = self._pos + return b"" + # Continue to next stream. + self._decompressor = LZMADecompressor(**self._init_args) + try: + data = self._decompressor.decompress(rawblock, size) + except LZMAError: + # Trailing data isn't a valid compressed stream; ignore it. + self._eof = True + self._size = self._pos + return b"" + else: + if self._decompressor.needs_input: + rawblock = self._fp.read(_BUFFER_SIZE) + if not rawblock: + raise EOFError("Compressed file ended before the " + "end-of-stream marker was reached") + else: + rawblock = bytes() + data = self._decompressor.decompress(rawblock, size) + if data: + break + self._pos += len(data) + return data + + # Rewind the file to the beginning of the data stream. + def _rewind(self): + self._fp.seek(0) + self._eof = False + self._pos = 0 + self._decompressor = LZMADecompressor(**self._init_args) + + def seek(self, offset, whence=io.SEEK_SET): self._check_can_seek() # Recalculate offset as an absolute file position. - if whence == 0: + if whence == io.SEEK_SET: pass - elif whence == 1: + elif whence == io.SEEK_CUR: offset = self._pos + offset - elif whence == 2: + elif whence == io.SEEK_END: # Seeking relative to EOF - we need to know the file's size. if self._size < 0: - self._read_all(return_data=False) + while self.read(io.DEFAULT_BUFFER_SIZE): + pass offset = self._size + offset else: raise ValueError("Invalid value for whence: {}".format(whence)) @@ -414,12 +376,15 @@ offset -= self._pos # Read and discard data until we reach the desired position. - self._read_block(offset, return_data=False) + while offset > 0: + data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset)) + if not data: + break + offset -= len(data) return self._pos def tell(self): - """Return the current file position.""" self._check_not_closed() return self._pos diff -r 7185a35fb293 Lib/test/test_lzma.py --- a/Lib/test/test_lzma.py Thu Feb 26 10:39:31 2015 +0100 +++ b/Lib/test/test_lzma.py Thu Feb 26 11:02:13 2015 +0000 @@ -829,7 +829,7 @@ with LZMAFile(BytesIO(), "w") as f: self.assertRaises(ValueError, f.read) with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: - self.assertRaises(TypeError, f.read, None) + self.assertRaises(TypeError, f.read, float()) def test_read_bad_data(self): with LZMAFile(BytesIO(COMPRESSED_BOGUS)) as f: @@ -925,6 +925,17 @@ with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: self.assertListEqual(f.readlines(), lines) + def test_decompress_limited(self): + """Read a small portion of a highly compressed stream""" + + bomb = lzma.compress(bytes(int(2e6)), preset=6) + self.assertLess(len(bomb), lzma._BUFFER_SIZE) + + decomp = LZMAFile(BytesIO(bomb)) + self.assertEqual(bytes(1), decomp.read(1)) + self.assertLess(decomp._buffer.raw.tell(), 1e6, + "Excessive amount of data was decompressed") + def test_write(self): with BytesIO() as dst: with LZMAFile(dst, "w") as f: @@ -1090,7 +1101,7 @@ self.assertRaises(ValueError, f.seek, 0) with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: self.assertRaises(ValueError, f.seek, 0, 3) - self.assertRaises(ValueError, f.seek, 9, ()) + self.assertRaises(Exception, f.seek, 9, ()) self.assertRaises(TypeError, f.seek, None) self.assertRaises(TypeError, f.seek, b"derp")