diff --git a/Lib/pickle.py b/Lib/pickle.py --- a/Lib/pickle.py +++ b/Lib/pickle.py @@ -174,12 +174,16 @@ ADDITEMS = b'\x90' # modify set EMPTY_FROZENSET = b'\x91' # push empty frozenset on the stack FROZENSET = b'\x92' # build frozenset from topmost stack items +PREFETCH = b'\xff' + __all__.extend([x for x in dir() if re.match("[A-Z][A-Z0-9_]+$", x)]) # Pickling machinery class _Pickler: + _PREFETCH_SIZE_TARGET = 64 * 1024 + def __init__(self, file, protocol=None, *, fix_imports=True): """This takes a binary file for writing a pickle data stream. @@ -209,7 +213,7 @@ class _Pickler: elif not 0 <= protocol <= HIGHEST_PROTOCOL: raise ValueError("pickle protocol must be <= %d" % HIGHEST_PROTOCOL) try: - self.write = file.write + self._file_write = file.write except AttributeError: raise TypeError("file must have a 'write' attribute") self.memo = {} @@ -229,6 +233,32 @@ class _Pickler: """ self.memo.clear() + def _start_prefetch_block(self): + if self.proto >= 4: + self._prefetch_block = io.BytesIO() + + def _commit_prefetch_block(self): + f = self._prefetch_block + if f is None: + return + with f.getbuffer() as data: + n = len(data) + if n > 0: + write = self._file_write + write(PREFETCH + pack(" self._PREFETCH_SIZE_TARGET: + self._commit_prefetch_block() + return f.write(data) + def dump(self, obj): """Write a pickled representation of obj to the open file.""" # Check whether Pickler was initialized correctly. This is @@ -236,10 +266,13 @@ class _Pickler: if not hasattr(self, "write"): raise PicklingError("Pickler.__init__() was not called by " "%s.__init__()" % (self.__class__.__name__,)) + self._prefetch_block = None if self.proto >= 2: self.write(PROTO + pack(" p: + raise UnpicklingError( + "PREFETCH end doesn't fall on an opcode boundary") + return self._file_read(n) + + def readline(self): + line = self._file_readline() + self._current_idx += len(line) + return line + # Return largest index k such that self.stack[k] is self.mark. # If the stack doesn't contain a mark, eventually raises IndexError. # This could be sped by maintaining another stack, of indices at which @@ -900,6 +953,18 @@ class _Unpickler: self.proto = proto dispatch[PROTO[0]] = load_proto + def load_prefetch(self): + # Actually prefetching would not improve performance for a pure + # Python implementation, but we check that the opcodes are sane. + if self._prefetch_end_idx is not None: + raise UnpicklingError("overlapping PREFETCH blocks") + n, = unpack('= 2: - expected = pickle.PROTO + bytes([proto]) + expected + build_proto = pickle.PROTO + bytes([proto]) + if proto >= 4: + expected = (build_proto + + pickle.PREFETCH + bytes([2, 0, 0, 0]) + + expected) + elif proto >= 2: + expected = build_proto + expected p = self.dumps(None, proto) self.assertEqual(p, expected) @@ -1208,7 +1213,10 @@ class AbstractPickleTests(unittest.TestC sizes = [len(self.dumps(2**n, proto)) for n in range(70)] # the size function is monotonic self.assertEqual(sorted(sizes), sizes) - if proto >= 2: + if proto >= 4: + # there is an additional PREFETCH opcode + self.assertLessEqual(sizes[-1], 19) + elif proto >= 2: self.assertLessEqual(sizes[-1], 14) def check_negative_32b_binXXX(self, dumped): diff --git a/Modules/_pickle.c b/Modules/_pickle.c --- a/Modules/_pickle.c +++ b/Modules/_pickle.c @@ -80,7 +80,8 @@ enum opcode { EMPTY_SET = '\x8f', ADDITEMS = '\x90', EMPTY_FROZENSET = '\x91', - FROZENSET = '\x92' + FROZENSET = '\x92', + PREFETCH = '\xFF' }; /* These aren't opcodes -- they're ways to pickle bools before protocol 2 @@ -111,8 +112,11 @@ enum { stream. This is ignored for in-memory pickling. */ MAX_WRITE_BUF_SIZE = 64 * 1024, - /* Prefetch size when unpickling (disabled on unpeekable streams) */ - PREFETCH = 8192 * 16 + /* Prefetch size when unpickling for protos < 4 + (disabled on unpeekable streams) */ + IMPLICIT_PREFETCH_SIZE = 8192 * 16, + + PREFETCH_SIZE_TARGET = 64 * 1024 }; /* Exception classes for pickle. These should override the ones defined in @@ -338,6 +342,8 @@ typedef struct PicklerObject { Py_ssize_t max_output_len; /* Allocation size of output_buffer. */ int proto; /* Pickle protocol number, >= 0 */ int bin; /* Boolean, true if proto > 0 */ + int issue_prefetches; + Py_ssize_t prefetch_block_start; Py_ssize_t buf_size; /* Size of the current buffered pickle data */ int fast; /* Enable fast mode if set to a true value. The fast mode disable the usage of memo, @@ -369,6 +375,7 @@ typedef struct UnpicklerObject { Py_ssize_t input_len; Py_ssize_t next_read_idx; Py_ssize_t prefetched_idx; /* index of first prefetched byte */ + Py_ssize_t prefetch_end_idx; PyObject *read; /* read() method of the input stream. */ PyObject *readline; /* readline() method of the input stream. */ PyObject *peek; /* peek() method of the input stream, or NULL */ @@ -678,14 +685,53 @@ static int if (self->output_buffer == NULL) return -1; self->output_len = 0; + self->prefetch_block_start = -1; return 0; } +static int +_Pickler_CommitPrefetchBlock(PicklerObject *self) +{ + size_t prefetch_len; + char *parg; + + if (self->prefetch_block_start == -1) + return 0; + prefetch_len = self->output_len - self->prefetch_block_start; +// fprintf(stderr, "c frame len = %zd\n", frame_len); + parg = PyBytes_AS_STRING(self->output_buffer) + self->prefetch_block_start - 4; + assert(parg[-1] == PREFETCH); + assert(prefetch_len <= 0xFFFFFFFFUL); + parg[0] = (unsigned char)(prefetch_len & 0xff); + parg[1] = (unsigned char)((prefetch_len >> 8) & 0xff); + parg[2] = (unsigned char)((prefetch_len >> 16) & 0xff); + parg[3] = (unsigned char)((prefetch_len >> 24) & 0xff); + self->prefetch_block_start = -1; + return 0; +} + +static int +_Pickler_OpcodeBoundary(PicklerObject *self) +{ + Py_ssize_t prefetch_len; + + if (self->prefetch_block_start == -1) + return 0; + prefetch_len = self->output_len - self->prefetch_block_start; + if (prefetch_len >= PREFETCH_SIZE_TARGET) + return _Pickler_CommitPrefetchBlock(self); + else + return 0; +} + static PyObject * _Pickler_GetString(PicklerObject *self) { PyObject *output_buffer = self->output_buffer; + if (_Pickler_CommitPrefetchBlock(self)) + return NULL; + assert(self->output_buffer != NULL); self->output_buffer = NULL; /* Resize down to exact size */ @@ -701,6 +747,7 @@ static int assert(self->write != NULL); + /* This will commit a pending prefetch block, if any. */ output = _Pickler_GetString(self); if (output == NULL) return -1; @@ -711,13 +758,20 @@ static int } static Py_ssize_t -_Pickler_Write(PicklerObject *self, const char *s, Py_ssize_t n) -{ - Py_ssize_t i, required; +_Pickler_Write(PicklerObject *self, const char *s, Py_ssize_t data_len) +{ + Py_ssize_t i, required, n; char *buffer; + int issue_prefetch; assert(s != NULL); + issue_prefetch = self->issue_prefetches && + self->prefetch_block_start == -1; + + n = data_len; + if (issue_prefetch) + n += 5; required = self->output_len + n; if (required > self->max_output_len) { if (self->write != NULL && required > MAX_WRITE_BUF_SIZE) { @@ -733,7 +787,7 @@ static Py_ssize_t PyObject *result; /* XXX we could spare an intermediate copy and pass a memoryview instead */ - PyObject *output = PyBytes_FromStringAndSize(s, n); + PyObject *output = PyBytes_FromStringAndSize(s, data_len); if (s == NULL) return -1; result = _Pickler_FastCall(self, self->write, output); @@ -751,17 +805,24 @@ static Py_ssize_t } } buffer = PyBytes_AS_STRING(self->output_buffer); + if (issue_prefetch) { + buffer[self->output_len++] = PREFETCH; + /* Deliberately put in a bogus prefetch len */ + for (i = 0; i < 4; i++) + buffer[self->output_len++] = 0xFE; + self->prefetch_block_start = self->output_len; + } if (n < 8) { /* This is faster than memcpy when the string is short. */ - for (i = 0; i < n; i++) { + for (i = 0; i < data_len; i++) { buffer[self->output_len + i] = s[i]; } } else { - memcpy(buffer + self->output_len, s, n); - } - self->output_len += n; - return n; + memcpy(buffer + self->output_len, s, data_len); + } + self->output_len += data_len; + return data_len; } static PicklerObject * @@ -779,6 +840,8 @@ static PicklerObject * self->write = NULL; self->proto = 0; self->bin = 0; + self->issue_prefetches = 0; + self->prefetch_block_start = -1; self->fast = 0; self->fast_nesting = 0; self->fix_imports = 0; @@ -937,9 +1000,9 @@ static Py_ssize_t return -1; /* Prefetch some data without advancing the file pointer, if possible */ - if (self->peek) { + if (self->peek && self->proto < 4) { PyObject *len, *prefetched; - len = PyLong_FromSsize_t(PREFETCH); + len = PyLong_FromSsize_t(IMPLICIT_PREFETCH_SIZE); if (len == NULL) { Py_DECREF(data); return -1; @@ -971,6 +1034,32 @@ static Py_ssize_t return read_size; } +static int +_Unpickler_ExplicitPrefetch(UnpicklerObject *self, Py_ssize_t n) +{ + Py_ssize_t num_read; + if (self->prefetch_end_idx != -1) { + PyErr_Format(UnpicklingError, "overlapping PREFETCH blocks"); + return -1; + } + if (self->read == NULL) { + self->prefetch_end_idx = self->next_read_idx + n; + if (self->prefetch_end_idx > self->input_len) { + PyErr_Format(UnpicklingError, "PREFETCH goes past EOF"); + return -1; + } + return 0; + } + num_read = _Unpickler_ReadFromFile(self, n); + if (num_read < 0) + return -1; + if (num_read < n) { + PyErr_Format(UnpicklingError, "PREFETCH goes past EOF"); + return -1; + } + return 0; +} + /* Read `n` bytes from the unpickler's data source, storing the result in `*s`. This should be used for all data reads, rather than accessing the unpickler's @@ -987,17 +1076,29 @@ static Py_ssize_t static Py_ssize_t _Unpickler_Read(UnpicklerObject *self, char **s, Py_ssize_t n) { - Py_ssize_t num_read; - - if (self->next_read_idx + n <= self->input_len) { + Py_ssize_t num_read, new_next_read_idx; + + new_next_read_idx = self->next_read_idx + n; + if (self->prefetch_end_idx != -1) { + if (new_next_read_idx == self->prefetch_end_idx) + self->prefetch_end_idx = -1; + else if (new_next_read_idx > self->prefetch_end_idx) { + PyErr_Format(UnpicklingError, + "PREFETCH end doesn't fall on an opcode boundary"); + return -1; + } + } + if (new_next_read_idx <= self->input_len) { *s = self->input_buffer + self->next_read_idx; - self->next_read_idx += n; + self->next_read_idx = new_next_read_idx; return n; } if (!self->read) { PyErr_Format(PyExc_EOFError, "Ran out of input"); return -1; } + if (self->prefetch_end_idx != -1) + self->prefetch_end_idx -= self->next_read_idx; num_read = _Unpickler_ReadFromFile(self, n); if (num_read < 0) return -1; @@ -1166,6 +1267,7 @@ static UnpicklerObject * self->input_len = 0; self->next_read_idx = 0; self->prefetched_idx = 0; + self->prefetch_end_idx = -1; self->read = NULL; self->readline = NULL; self->peek = NULL; @@ -1290,6 +1392,8 @@ memo_put(PicklerObject *self, PyObject * if (self->fast) return 0; + if (_Pickler_OpcodeBoundary(self)) + goto error; x = PyMemoTable_Size(self->memo); if (PyMemoTable_Set(self->memo, obj, x) < 0) @@ -3544,6 +3648,8 @@ save(PicklerObject *self, PyObject *obj, status = -1; } done: + if (status == 0) + status = _Pickler_OpcodeBoundary(self); Py_LeaveRecursiveCall(); Py_XDECREF(reduce_func); Py_XDECREF(reduce_value); @@ -3556,6 +3662,7 @@ dump(PicklerObject *self, PyObject *obj) { const char stop_op = STOP; + self->issue_prefetches = 0; if (self->proto >= 2) { char header[2]; @@ -3564,6 +3671,8 @@ dump(PicklerObject *self, PyObject *obj) header[1] = (unsigned char)self->proto; if (_Pickler_Write(self, header, 2) < 0) return -1; + if (self->proto >= 4) + self->issue_prefetches = 1; } if (save(self, obj, 0) < 0 || @@ -5585,6 +5694,21 @@ load_proto(UnpicklerObject *self) return -1; } +static int +load_prefetch(UnpicklerObject *self) +{ + char *s; + Py_ssize_t prefetch_len; + + if (_Unpickler_Read(self, &s, 4) < 0) + return -1; + + prefetch_len = calc_binsize(s, 4); + if (prefetch_len < 0) + return -1; + return _Unpickler_ExplicitPrefetch(self, prefetch_len); +} + static PyObject * load(UnpicklerObject *self) { @@ -5595,6 +5719,7 @@ load(UnpicklerObject *self) self->num_marks = 0; if (Py_SIZE(self->stack)) Pdata_clear(self->stack, 0); + self->prefetch_end_idx = -1; /* Convenient macros for the dispatch while-switch loop just below. */ #define OP(opcode, load_func) \ @@ -5664,6 +5789,7 @@ load(UnpicklerObject *self) OP(BINPERSID, load_binpersid) OP(REDUCE, load_reduce) OP(PROTO, load_proto) + OP(PREFETCH, load_prefetch) OP_ARG(EXT1, load_extension, 1) OP_ARG(EXT2, load_extension, 2) OP_ARG(EXT4, load_extension, 4)