diff -r af7bc95e5b1e .hgtags --- a/.hgtags Thu Jun 09 14:10:07 2011 +0200 +++ b/.hgtags Mon Jun 13 20:02:23 2011 +0100 @@ -88,3 +88,4 @@ a222a015e28d8ae9af3899258dc6c15c3d40add0 v3.2 8ffac2337a3323323d02153ac919fd1483176652 v3.2.1b1 cfa9364997c7f2e67b9cbb45c3a5fa3bba4e4999 v3.2.1rc1 +f04ae131656ba0ba6ecb642eebaa7dd1d57730cd iopatch-submit1 diff -r af7bc95e5b1e Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py Thu Jun 09 14:10:07 2011 +0200 +++ b/Lib/multiprocessing/connection.py Mon Jun 13 20:02:23 2011 +0100 @@ -289,6 +289,7 @@ with FILE_FLAG_OVERLAPPED. """ _buffered = b'' + _full_msg = False def _close(self): win32.CloseHandle(self._handle) @@ -302,6 +303,10 @@ def _recv_bytes(self, maxsize=None, sentinels=()): if sentinels: self._poll(-1.0, sentinels) + if self._full_msg: + tmp = self._buffered + del self._buffered, self._full_msg + return io.BytesIO(tmp) buf = io.BytesIO() firstchunk = self._buffered if firstchunk: @@ -338,10 +343,12 @@ def _poll(self, timeout, sentinels=()): # Fast non-blocking path + if self._buffered or self._full_msg: + return True navail, nleft = win32.PeekNamedPipe(self._handle) if navail > 0: return True - elif timeout == 0.0: + elif timeout == 0.0 and nleft != 0: return False # Blocking: use overlapped I/O if timeout < 0.0: @@ -357,16 +364,24 @@ # Always cancel overlapped I/O in the same thread # (because CancelIoEx() appears only in Vista) overlapped.cancel() - if res == WAIT_TIMEOUT: + try: + ores = overlapped.GetOverlappedResult(True) + except IOError as e: + if e.errno != win32.ERROR_OPERATION_ABORTED: + raise + ores = None + + if ores is not None: + # I/O was successful, store received data + self._buffered = overlapped.getbuffer() + self._full_msg = not overlapped.moredata() + return True + elif res == WAIT_TIMEOUT: return False - idx = res - WAIT_OBJECT_0 - if idx == 0: - # I/O was successful, store received data - overlapped.GetOverlappedResult(True) - self._buffered += overlapped.getbuffer() - return True - assert 0 < idx < len(handles) - raise SentinelReady([handles[idx]]) + else: + idx = res - WAIT_OBJECT_0 + assert 0 < idx < len(handles) + raise SentinelReady([handles[idx]]) class Connection(_ConnectionBase): diff -r af7bc95e5b1e Modules/_multiprocessing/win32_functions.c --- a/Modules/_multiprocessing/win32_functions.c Thu Jun 09 14:10:07 2011 +0200 +++ b/Modules/_multiprocessing/win32_functions.c Mon Jun 13 20:02:23 2011 +0100 @@ -51,6 +51,8 @@ int pending; /* Whether I/O completed successfully */ int completed; + /* Whether there is more data in this message */ + int more_data; /* Buffer used for reading (optional) */ PyObject *read_buffer; /* Buffer used for writing (optional) */ @@ -60,16 +62,18 @@ static void overlapped_dealloc(OverlappedObject *self) { + DWORD bytes; int err = GetLastError(); if (self->pending) { - if (check_CancelIoEx()) - Py_CancelIoEx(self->handle, &self->overlapped); - else { - PyErr_SetString(PyExc_RuntimeError, - "I/O operations still in flight while destroying " - "Overlapped object, the process may crash"); - PyErr_WriteUnraisable(NULL); - } + /* make it a programming error to deallocate while operation + is pending, even if we can safely cancel it */ + if (check_CancelIoEx() && + Py_CancelIoEx(self->handle, &self->overlapped)) + GetOverlappedResult(self->handle, &self->overlapped, &bytes, TRUE); + PyErr_SetString(PyExc_RuntimeError, + "I/O operations still in flight while destroying " + "Overlapped object, the process may crash"); + PyErr_WriteUnraisable(NULL); } CloseHandle(self->overlapped.hEvent); SetLastError(err); @@ -98,7 +102,9 @@ int err = GetLastError(); if (err == ERROR_IO_INCOMPLETE) Py_RETURN_NONE; - if (err != ERROR_MORE_DATA) { + if (err == ERROR_MORE_DATA) { + self->more_data = 1; + } else { self->pending = 0; return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); } @@ -129,6 +135,20 @@ } static PyObject * +overlapped_moredata(OverlappedObject *self) +{ + if (!self->completed) { + PyErr_SetString(PyExc_ValueError, "operation has not completed"); + return NULL; + } + if (self->read_buffer == NULL) { + PyErr_SetString(PyExc_ValueError, "not a read operation"); + return NULL; + } + return PyBool_FromLong((long)self->more_data); +} + +static PyObject * overlapped_cancel(OverlappedObject *self) { BOOL res = TRUE; @@ -153,6 +173,7 @@ {"GetOverlappedResult", (PyCFunction) overlapped_GetOverlappedResult, METH_O, NULL}, {"getbuffer", (PyCFunction) overlapped_getbuffer, METH_NOARGS, NULL}, + {"moredata", (PyCFunction) overlapped_moredata, METH_NOARGS, NULL}, {"cancel", (PyCFunction) overlapped_cancel, METH_NOARGS, NULL}, {NULL} }; @@ -217,6 +238,7 @@ self->read_buffer = NULL; self->pending = 0; self->completed = 0; + self->more_data = 0; memset(&self->overlapped, 0, sizeof(OVERLAPPED)); memset(&self->write_buffer, 0, sizeof(Py_buffer)); /* Manual reset, initially non-signalled */ @@ -767,6 +789,7 @@ WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE); WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES); + WIN32_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED); WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);