diff --git a/Doc/library/bz2.rst b/Doc/library/bz2.rst --- a/Doc/library/bz2.rst +++ b/Doc/library/bz2.rst @@ -162,15 +162,32 @@ you need to decompress a multi-stream input with :class:`BZ2Decompressor`, you must use a new decompressor for each stream. - .. method:: decompress(data) + .. method:: decompress(data, max_length=-1) - Provide data to the decompressor object. Returns a chunk of decompressed - data if possible, or an empty byte string otherwise. + Decompress *data* (a :term:`bytes-like object`), returning + uncompressed data as bytes. Some of *data* may be buffered + internally, for use in later calls to :meth:`decompress`. The + returned data should be concatenated with the output of any + previous calls to :meth:`decompress`. - Attempting to decompress data after the end of the current stream is - reached raises an :exc:`EOFError`. If any data is found after the end of - the stream, it is ignored and saved in the :attr:`unused_data` attribute. + If *max_length* is nonnegative, returns at most *max_length* + bytes of decompressed data. If this limit is reached and further + output can be produced, the :attr:`~.needs_input` attribute will + be set to ``False``. In this case, the next call to + :meth:`~.decompress` may provide *data* as ``b''`` to obtain + more of the output. + If all of the input data was decompressed and returned (either + because this was less than *max_length* bytes, or because + *max_length* was negative), the :attr:`~.needs_input` attribute + will be set to ``True``. + + Attempting to decompress data after the end of stream is reached + raises an `EOFError`. Any data found after the end of the + stream is ignored and saved in the :attr:`~.unused_data` attribute. + + .. versionchanged:: 3.5 + Added the *max_length* parameter. .. attribute:: eof @@ -186,6 +203,13 @@ If this attribute is accessed before the end of the stream has been reached, its value will be ``b''``. + .. attribute:: needs_input + + ``False`` if the :meth:`.decompress` method can provide more + decompressed data before requiring new uncompressed input. + + .. versionadded:: 3.5 + One-shot (de)compression ------------------------ diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -5,6 +5,7 @@ from io import BytesIO import os import pickle +import glob import random import subprocess import sys @@ -51,6 +52,19 @@ EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' BAD_DATA = b'this is not a valid bzip2 file' + # Some tests need more than one block of uncompressed data. Since one block + # is at least 100 kB, we gather some data dynamically and compress it. + # Note that this assumes that compression works correctly, so we cannot + # simply use the bigger test data for all tests. + test_size = 0 + BIG_TEXT = bytearray(128*1024) + for fname in glob.glob(os.path.join(os.path.dirname(__file__), '*.py')): + with open(fname, 'rb') as fh: + test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) + if test_size > 128*1024: + break + BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) + def setUp(self): self.filename = support.TESTFN @@ -707,6 +721,95 @@ with self.assertRaises(TypeError): pickle.dumps(BZ2Decompressor(), proto) + def testDecompressorChunksMaxsize(self): + bzd = BZ2Decompressor() + max_length = 100 + out = [] + + # Feed some input + len_ = len(self.BIG_DATA) - 64 + out.append(bzd.decompress(self.BIG_DATA[:len_], + max_length=max_length)) + self.assertFalse(bzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data without providing more input + out.append(bzd.decompress(b'', max_length=max_length)) + self.assertFalse(bzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data while providing more input + out.append(bzd.decompress(self.BIG_DATA[len_:], + max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + # Retrieve remaining uncompressed data + while not bzd.eof: + out.append(bzd.decompress(b'', max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + out = b"".join(out) + self.assertEqual(out, self.BIG_TEXT) + self.assertEqual(bzd.unused_data, b"") + + def test_decompressor_inputbuf_1(self): + # Test reusing input buffer after moving existing + # contents to beginning + bzd = BZ2Decompressor() + out = [] + + # Create input buffer and fill it + self.assertEqual(bzd.decompress(self.DATA[:100], + max_length=0), b'') + + # Retrieve some results, freeing capacity at beginning + # of input buffer + out.append(bzd.decompress(b'', 2)) + + # Add more data that fits into input buffer after + # moving existing data to beginning + out.append(bzd.decompress(self.DATA[100:105], 15)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[105:])) + self.assertEqual(b''.join(out), self.TEXT) + + def test_decompressor_inputbuf_2(self): + # Test reusing input buffer by appending data at the + # end right away + bzd = BZ2Decompressor() + out = [] + + # Create input buffer and empty it + self.assertEqual(bzd.decompress(self.DATA[:200], + max_length=0), b'') + out.append(bzd.decompress(b'')) + + # Fill buffer with new data + out.append(bzd.decompress(self.DATA[200:280], 2)) + + # Append some more data, not enough to require resize + out.append(bzd.decompress(self.DATA[280:300], 2)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[300:])) + self.assertEqual(b''.join(out), self.TEXT) + + def test_decompressor_inputbuf_3(self): + # Test reusing input buffer after extending it + + bzd = BZ2Decompressor() + out = [] + + # Create almost full input buffer + out.append(bzd.decompress(self.DATA[:200], 5)) + + # Add even more data to it, requiring resize + out.append(bzd.decompress(self.DATA[200:300], 5)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[300:])) + self.assertEqual(b''.join(out), self.TEXT) class CompressDecompressTest(BaseTest): def testCompress(self): diff --git a/Modules/_bz2module.c b/Modules/_bz2module.c --- a/Modules/_bz2module.c +++ b/Modules/_bz2module.c @@ -51,6 +51,14 @@ bz_stream bzs; char eof; /* T_BOOL expects a char */ PyObject *unused_data; + char needs_input; + char *input_buffer; + size_t input_buffer_size; + + /* bzs->avail_in is only 32 bit, so we store the true length + separately. Conversion and looping is encapsulated in + decompress_buf() */ + size_t bzs_avail_in_real; #ifdef WITH_THREAD PyThread_type_lock lock; #endif @@ -111,19 +119,23 @@ } #if BUFSIZ < 8192 -#define SMALLCHUNK 8192 +#define INITIAL_BUFFER_SIZE 8192 #else -#define SMALLCHUNK BUFSIZ +#define INITIAL_BUFFER_SIZE BUFSIZ #endif static int -grow_buffer(PyObject **buf) +grow_buffer(PyObject **buf, Py_ssize_t max_length) { /* Expand the buffer by an amount proportional to the current size, giving us amortized linear-time behavior. Use a less-than-double growth factor to avoid excessive allocation. */ size_t size = PyBytes_GET_SIZE(*buf); size_t new_size = size + (size >> 3) + 6; + + if (max_length > 0 && new_size > (size_t) max_length) + new_size = (size_t) max_length; + if (new_size > size) { return _PyBytes_Resize(buf, new_size); } else { /* overflow */ @@ -142,14 +154,14 @@ size_t data_size = 0; PyObject *result; - result = PyBytes_FromStringAndSize(NULL, SMALLCHUNK); + result = PyBytes_FromStringAndSize(NULL, INITIAL_BUFFER_SIZE); if (result == NULL) return NULL; c->bzs.next_in = data; c->bzs.avail_in = 0; c->bzs.next_out = PyBytes_AS_STRING(result); - c->bzs.avail_out = SMALLCHUNK; + c->bzs.avail_out = INITIAL_BUFFER_SIZE; for (;;) { char *this_out; int bzerror; @@ -168,7 +180,7 @@ if (c->bzs.avail_out == 0) { size_t buffer_left = PyBytes_GET_SIZE(result) - data_size; if (buffer_left == 0) { - if (grow_buffer(&result) < 0) + if (grow_buffer(&result, -1) < 0) goto error; c->bzs.next_out = PyBytes_AS_STRING(result) + data_size; buffer_left = PyBytes_GET_SIZE(result) - data_size; @@ -402,64 +414,179 @@ /* BZ2Decompressor class. */ -static PyObject * -decompress(BZ2Decompressor *d, char *data, size_t len) +/* Decompress data of length d->bzs_avail_in_real in d->bzs.next_in. The output + buffer is allocated dynamically and returned. At most max_length bytes are + returned, so some of the input may not be consumed. d->bzs.next_in and + d->bzs_avail_in_real are updated to reflect the consumed input. */ +static PyObject* +decompress_buf(BZ2Decompressor *d, Py_ssize_t max_length) { - size_t data_size = 0; + /* data_size is strictly positive, but because we repeatedly have to + compare against max_length and PyBytes_GET_SIZE we declare it as + signed */ + Py_ssize_t data_size = 0; PyObject *result; + bz_stream *bzs = &d->bzs; - result = PyBytes_FromStringAndSize(NULL, SMALLCHUNK); + if (max_length < 0 || max_length >= INITIAL_BUFFER_SIZE) + result = PyBytes_FromStringAndSize(NULL, INITIAL_BUFFER_SIZE); + else + result = PyBytes_FromStringAndSize(NULL, max_length); if (result == NULL) - return result; - d->bzs.next_in = data; - /* On a 64-bit system, len might not fit in avail_in (an unsigned int). - Do decompression in chunks of no more than UINT_MAX bytes each. */ - d->bzs.avail_in = (unsigned int)Py_MIN(len, UINT_MAX); - len -= d->bzs.avail_in; - d->bzs.next_out = PyBytes_AS_STRING(result); - d->bzs.avail_out = SMALLCHUNK; + return NULL; + + bzs->next_out = PyBytes_AS_STRING(result); for (;;) { - char *this_out; - int bzerror; + int bzret; + size_t avail; + + /* On a 64-bit system, buffer length might not fit in avail_out. Do + decompression in chunks of no more than UINT_MAX bytes each. The + first operand of Py_MIN is guaranteed to be positive, so the cast + is safe and the comparison well-defined */ + avail = (size_t) (PyBytes_GET_SIZE(result) - data_size); + bzs->avail_out = (unsigned int)Py_MIN(avail, UINT_MAX); + bzs->avail_in = (unsigned int)Py_MIN(d->bzs_avail_in_real, UINT_MAX); + d->bzs_avail_in_real -= bzs->avail_in; + + bzs->avail_in = (unsigned int)Py_MIN(d->bzs_avail_in_real, UINT_MAX); + d->bzs_avail_in_real -= bzs->avail_in; Py_BEGIN_ALLOW_THREADS - this_out = d->bzs.next_out; - bzerror = BZ2_bzDecompress(&d->bzs); - data_size += d->bzs.next_out - this_out; + bzret = BZ2_bzDecompress(bzs); + data_size = bzs->next_out - PyBytes_AS_STRING(result); + d->bzs_avail_in_real += bzs->avail_in; Py_END_ALLOW_THREADS - if (catch_bz2_error(bzerror)) + if (catch_bz2_error(bzret)) goto error; - if (bzerror == BZ_STREAM_END) { + if (bzret == BZ_STREAM_END) { d->eof = 1; - len += d->bzs.avail_in; - if (len > 0) { /* Save leftover input to unused_data */ - Py_CLEAR(d->unused_data); - d->unused_data = PyBytes_FromStringAndSize(d->bzs.next_in, len); - if (d->unused_data == NULL) - goto error; - } break; - } - if (d->bzs.avail_in == 0) { - if (len == 0) + } else if (d->bzs_avail_in_real == 0) { + break; + } else if (bzs->avail_out == 0) { + if (data_size == max_length) break; - d->bzs.avail_in = (unsigned int)Py_MIN(len, UINT_MAX); - len -= d->bzs.avail_in; - } - if (d->bzs.avail_out == 0) { - size_t buffer_left = PyBytes_GET_SIZE(result) - data_size; - if (buffer_left == 0) { - if (grow_buffer(&result) < 0) - goto error; - d->bzs.next_out = PyBytes_AS_STRING(result) + data_size; - buffer_left = PyBytes_GET_SIZE(result) - data_size; - } - d->bzs.avail_out = (unsigned int)Py_MIN(buffer_left, UINT_MAX); + if (data_size == PyBytes_GET_SIZE(result) && + grow_buffer(&result, max_length) == -1) + goto error; + bzs->next_out = PyBytes_AS_STRING(result) + data_size; } } - if (data_size != (size_t)PyBytes_GET_SIZE(result)) - if (_PyBytes_Resize(&result, data_size) < 0) + if (data_size != PyBytes_GET_SIZE(result)) + if (_PyBytes_Resize(&result, data_size) == -1) goto error; + + return result; + +error: + Py_XDECREF(result); + return NULL; +} + + +static PyObject * +decompress(BZ2Decompressor *d, char *data, size_t len, Py_ssize_t max_length) +{ + char input_buffer_in_use; + PyObject *result; + bz_stream *bzs = &d->bzs; + + /* Prepend unconsumed input if necessary */ + if (bzs->next_in != NULL) { + size_t avail_now, avail_total; + + /* Number of bytes we can append to input buffer */ + avail_now = (d->input_buffer + d->input_buffer_size) + - (bzs->next_in + d->bzs_avail_in_real); + + /* Number of bytes we can append if we move existing + contents to beginning of buffer (overwriting + consumed input) */ + avail_total = d->input_buffer_size - d->bzs_avail_in_real; + + if (avail_total < len) { + size_t offset = bzs->next_in - d->input_buffer; + char *tmp; + size_t new_size = d->input_buffer_size + len - avail_now; + + /* Assign to temporary variable first, so we don't + lose address of allocated buffer if realloc fails */ + tmp = PyMem_Realloc(d->input_buffer, new_size); + if (tmp == NULL) { + PyErr_SetNone(PyExc_MemoryError); + return NULL; + } + d->input_buffer = tmp; + d->input_buffer_size = new_size; + + bzs->next_in = d->input_buffer + offset; + } + else if (avail_now < len) { + memmove(d->input_buffer, bzs->next_in, + d->bzs_avail_in_real); + bzs->next_in = d->input_buffer; + } + memcpy((void*)(bzs->next_in + d->bzs_avail_in_real), data, len); + d->bzs_avail_in_real += len; + input_buffer_in_use = 1; + } + else { + bzs->next_in = data; + d->bzs_avail_in_real = len; + input_buffer_in_use = 0; + } + + result = decompress_buf(d, max_length); + if(result == NULL) + return NULL; + + if (d->eof) { + d->needs_input = 0; + if (d->bzs_avail_in_real > 0) { + Py_CLEAR(d->unused_data); + d->unused_data = PyBytes_FromStringAndSize( + bzs->next_in, d->bzs_avail_in_real); + if (d->unused_data == NULL) + goto error; + } + } + else if (d->bzs_avail_in_real == 0) { + bzs->next_in = NULL; + d->needs_input = 1; + } + else { + d->needs_input = 0; + + /* If we did not use the input buffer, we now have + to copy the tail from the caller's buffer into the + input buffer */ + if (!input_buffer_in_use) { + + /* Discard buffer if it's too small + (resizing it may needlessly copy the current contents) */ + if (d->input_buffer != NULL && + d->input_buffer_size < d->bzs_avail_in_real) { + PyMem_Free(d->input_buffer); + d->input_buffer = NULL; + } + + /* Allocate if necessary */ + if (d->input_buffer == NULL) { + d->input_buffer = PyMem_Malloc(d->bzs_avail_in_real); + if (d->input_buffer == NULL) { + PyErr_SetNone(PyExc_MemoryError); + goto error; + } + d->input_buffer_size = d->bzs_avail_in_real; + } + + /* Copy tail */ + memcpy(d->input_buffer, bzs->next_in, d->bzs_avail_in_real); + bzs->next_in = d->input_buffer; + } + } + return result; error: @@ -470,21 +597,29 @@ /*[clinic input] _bz2.BZ2Decompressor.decompress + self: self(type="BZ2Decompressor *") data: Py_buffer - / + max_length: Py_ssize_t=-1 -Provide data to the decompressor object. +Decompress *data*, returning uncompressed data as bytes. -Returns a chunk of decompressed data if possible, or b'' otherwise. +If *max_length* is nonnegative, returns at most *max_length* bytes of +decompressed data. If this limit is reached and further output can be +produced, *self.needs_input* will be set to ``False``. In this case, the next +call to *decompress()* may provide *data* as b'' to obtain more of the output. -Attempting to decompress data after the end of stream is reached -raises an EOFError. Any data found after the end of the stream -is ignored and saved in the unused_data attribute. +If all of the input data was decompressed and returned (either because this +was less than *max_length* bytes, or because *max_length* was negative), +*self.needs_input* will be set to True. + +Attempting to decompress data after the end of stream is reached raises an +EOFError. Any data found after the end of the stream is ignored and saved in +the unused_data attribute. [clinic start generated code]*/ static PyObject * -_bz2_BZ2Decompressor_decompress_impl(BZ2Decompressor *self, Py_buffer *data) -/*[clinic end generated code: output=086e4b99e60cb3f6 input=616c2a6db5269961]*/ +_bz2_BZ2Decompressor_decompress_impl(BZ2Decompressor *self, Py_buffer *data, Py_ssize_t max_length) +/*[clinic end generated code: output=7eeb5794035a2ca3 input=9558b424c8b00516]*/ { PyObject *result = NULL; @@ -492,7 +627,7 @@ if (self->eof) PyErr_SetString(PyExc_EOFError, "End of stream already reached"); else - result = decompress(self, data->buf, data->len); + result = decompress(self, data->buf, data->len, max_length); RELEASE_LOCK(self); return result; } @@ -527,10 +662,14 @@ } #endif - self->unused_data = PyBytes_FromStringAndSize("", 0); + self->needs_input = 1; + self->bzs_avail_in_real = 0; + self->input_buffer = NULL; + self->input_buffer_size = 0; + self->unused_data = PyBytes_FromStringAndSize(NULL, 0); if (self->unused_data == NULL) goto error; - + bzerror = BZ2_bzDecompressInit(&self->bzs, 0, 0); if (catch_bz2_error(bzerror)) goto error; @@ -549,6 +688,8 @@ static void BZ2Decompressor_dealloc(BZ2Decompressor *self) { + if(self->input_buffer != NULL) + PyMem_Free(self->input_buffer); BZ2_bzDecompressEnd(&self->bzs); Py_CLEAR(self->unused_data); #ifdef WITH_THREAD @@ -570,11 +711,16 @@ PyDoc_STRVAR(BZ2Decompressor_unused_data__doc__, "Data found after the end of the compressed stream."); +PyDoc_STRVAR(BZ2Decompressor_needs_input_doc, +"True if more input is needed before more decompressed data can be produced."); + static PyMemberDef BZ2Decompressor_members[] = { {"eof", T_BOOL, offsetof(BZ2Decompressor, eof), READONLY, BZ2Decompressor_eof__doc__}, {"unused_data", T_OBJECT_EX, offsetof(BZ2Decompressor, unused_data), READONLY, BZ2Decompressor_unused_data__doc__}, + {"needs_input", T_BOOL, offsetof(BZ2Decompressor, needs_input), READONLY, + BZ2Decompressor_needs_input_doc}, {NULL} }; diff --git a/Modules/clinic/_bz2module.c.h b/Modules/clinic/_bz2module.c.h --- a/Modules/clinic/_bz2module.c.h +++ b/Modules/clinic/_bz2module.c.h @@ -95,34 +95,43 @@ } PyDoc_STRVAR(_bz2_BZ2Decompressor_decompress__doc__, -"decompress($self, data, /)\n" +"decompress($self, /, data, max_length=-1)\n" "--\n" "\n" -"Provide data to the decompressor object.\n" +"Decompress *data*, returning uncompressed data as bytes.\n" "\n" -"Returns a chunk of decompressed data if possible, or b\'\' otherwise.\n" +"If *max_length* is nonnegative, returns at most *max_length* bytes of\n" +"decompressed data. If this limit is reached and further output can be\n" +"produced, *self.needs_input* will be set to ``False``. In this case, the next\n" +"call to *decompress()* may provide *data* as b\'\' to obtain more of the output.\n" "\n" -"Attempting to decompress data after the end of stream is reached\n" -"raises an EOFError. Any data found after the end of the stream\n" -"is ignored and saved in the unused_data attribute."); +"If all of the input data was decompressed and returned (either because this\n" +"was less than *max_length* bytes, or because *max_length* was negative),\n" +"*self.needs_input* will be set to True.\n" +"\n" +"Attempting to decompress data after the end of stream is reached raises an\n" +"EOFError. Any data found after the end of the stream is ignored and saved in\n" +"the unused_data attribute."); #define _BZ2_BZ2DECOMPRESSOR_DECOMPRESS_METHODDEF \ - {"decompress", (PyCFunction)_bz2_BZ2Decompressor_decompress, METH_VARARGS, _bz2_BZ2Decompressor_decompress__doc__}, + {"decompress", (PyCFunction)_bz2_BZ2Decompressor_decompress, METH_VARARGS|METH_KEYWORDS, _bz2_BZ2Decompressor_decompress__doc__}, static PyObject * -_bz2_BZ2Decompressor_decompress_impl(BZ2Decompressor *self, Py_buffer *data); +_bz2_BZ2Decompressor_decompress_impl(BZ2Decompressor *self, Py_buffer *data, Py_ssize_t max_length); static PyObject * -_bz2_BZ2Decompressor_decompress(BZ2Decompressor *self, PyObject *args) +_bz2_BZ2Decompressor_decompress(BZ2Decompressor *self, PyObject *args, PyObject *kwargs) { PyObject *return_value = NULL; + static char *_keywords[] = {"data", "max_length", NULL}; Py_buffer data = {NULL, NULL}; + Py_ssize_t max_length = -1; - if (!PyArg_ParseTuple(args, - "y*:decompress", - &data)) + if (!PyArg_ParseTupleAndKeywords(args, kwargs, + "y*|n:decompress", _keywords, + &data, &max_length)) goto exit; - return_value = _bz2_BZ2Decompressor_decompress_impl(self, &data); + return_value = _bz2_BZ2Decompressor_decompress_impl(self, &data, max_length); exit: /* Cleanup for data */ @@ -159,4 +168,4 @@ exit: return return_value; } -/*[clinic end generated code: output=21ca4405519a0931 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=8e65e3953430bc3d input=a9049054013a1b77]*/