diff -r af7bc95e5b1e Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py Thu Jun 09 14:10:07 2011 +0200 +++ b/Lib/multiprocessing/connection.py Tue Jun 14 23:29:34 2011 +0100 @@ -289,6 +289,7 @@ with FILE_FLAG_OVERLAPPED. """ _buffered = b'' + _full_msg = False def _close(self): win32.CloseHandle(self._handle) @@ -302,6 +303,10 @@ def _recv_bytes(self, maxsize=None, sentinels=()): if sentinels: self._poll(-1.0, sentinels) + if self._full_msg: + tmp = self._buffered + del self._buffered, self._full_msg + return io.BytesIO(tmp) buf = io.BytesIO() firstchunk = self._buffered if firstchunk: @@ -338,11 +343,11 @@ def _poll(self, timeout, sentinels=()): # Fast non-blocking path + if self._buffered or self._full_msg: + return True navail, nleft = win32.PeekNamedPipe(self._handle) if navail > 0: return True - elif timeout == 0.0: - return False # Blocking: use overlapped I/O if timeout < 0.0: timeout = INFINITE @@ -357,16 +362,24 @@ # Always cancel overlapped I/O in the same thread # (because CancelIoEx() appears only in Vista) overlapped.cancel() - if res == WAIT_TIMEOUT: + try: + ores = overlapped.GetOverlappedResult(True) + except IOError as e: + if e.errno != win32.ERROR_OPERATION_ABORTED: + raise + ores = None + + if ores is not None: + # I/O was successful, store received data + self._buffered = overlapped.getbuffer() + self._full_msg = not overlapped.moredata() + return True + elif res == WAIT_TIMEOUT: return False - idx = res - WAIT_OBJECT_0 - if idx == 0: - # I/O was successful, store received data - overlapped.GetOverlappedResult(True) - self._buffered += overlapped.getbuffer() - return True - assert 0 < idx < len(handles) - raise SentinelReady([handles[idx]]) + else: + idx = res - WAIT_OBJECT_0 + assert 0 < idx < len(handles) + raise SentinelReady([handles[idx]]) class Connection(_ConnectionBase): diff -r af7bc95e5b1e Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Thu Jun 09 14:10:07 2011 +0200 +++ b/Lib/test/test_multiprocessing.py Tue Jun 14 23:29:34 2011 +0100 @@ -1604,6 +1604,65 @@ self.assertEqual(conn.recv(), 'hello') p.join() l.close() + +class _TestPoll(unittest.TestCase): + + ALLOWED_TYPES = ('processes', 'threads') + + def test_empty_string(self): + a, b = self.Pipe() + self.assertEqual(a.poll(), False) + b.send_bytes(b'') + self.assertEqual(a.poll(), True) + self.assertEqual(a.poll(), True) + + @classmethod + def _child_strings(cls, conn, strings): + for s in strings: + time.sleep(0.1) + conn.send_bytes(s) + conn.close() + + def test_strings(self): + strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') + a, b = self.Pipe() + p = self.Process(target=self._child_strings, args=(b, strings)) + p.start() + + for s in strings: + for i in range(200): + if a.poll(0.01): + break + self.assertEqual(s, a.recv_bytes()) + + p.join() + + @classmethod + def _child_dont_merge(cls, b): + b.send_bytes(b'a') + b.send_bytes(b'b') + b.send_bytes(b'cd') + + + def test_dont_merge(self): + a, b = self.Pipe() + self.assertEqual(a.poll(0.0), False) + self.assertEqual(a.poll(0.1), False) + + p = self.Process(target=self._child_dont_merge, args=(b,)) + p.start() + + self.assertEqual(a.recv_bytes(), b'a') + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.recv_bytes(), b'b') + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(0.0), True) + self.assertEqual(a.recv_bytes(), b'cd') + + p.join() + # # Test of sending connection and socket objects between processes # diff -r af7bc95e5b1e Modules/_multiprocessing/win32_functions.c --- a/Modules/_multiprocessing/win32_functions.c Thu Jun 09 14:10:07 2011 +0200 +++ b/Modules/_multiprocessing/win32_functions.c Tue Jun 14 23:29:34 2011 +0100 @@ -51,6 +51,8 @@ int pending; /* Whether I/O completed successfully */ int completed; + /* Whether there is more data in this message */ + int more_data; /* Buffer used for reading (optional) */ PyObject *read_buffer; /* Buffer used for writing (optional) */ @@ -60,16 +62,18 @@ static void overlapped_dealloc(OverlappedObject *self) { + DWORD bytes; int err = GetLastError(); if (self->pending) { - if (check_CancelIoEx()) - Py_CancelIoEx(self->handle, &self->overlapped); - else { - PyErr_SetString(PyExc_RuntimeError, - "I/O operations still in flight while destroying " - "Overlapped object, the process may crash"); - PyErr_WriteUnraisable(NULL); - } + /* make it a programming error to deallocate while operation + is pending, even if we can safely cancel it */ + if (check_CancelIoEx() && + Py_CancelIoEx(self->handle, &self->overlapped)) + GetOverlappedResult(self->handle, &self->overlapped, &bytes, TRUE); + PyErr_SetString(PyExc_RuntimeError, + "I/O operations still in flight while destroying " + "Overlapped object, the process may crash"); + PyErr_WriteUnraisable(NULL); } CloseHandle(self->overlapped.hEvent); SetLastError(err); @@ -98,7 +102,9 @@ int err = GetLastError(); if (err == ERROR_IO_INCOMPLETE) Py_RETURN_NONE; - if (err != ERROR_MORE_DATA) { + if (err == ERROR_MORE_DATA) { + self->more_data = 1; + } else { self->pending = 0; return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); } @@ -129,6 +135,20 @@ } static PyObject * +overlapped_moredata(OverlappedObject *self) +{ + if (!self->completed) { + PyErr_SetString(PyExc_ValueError, "operation has not completed"); + return NULL; + } + if (self->read_buffer == NULL) { + PyErr_SetString(PyExc_ValueError, "not a read operation"); + return NULL; + } + return PyBool_FromLong((long)self->more_data); +} + +static PyObject * overlapped_cancel(OverlappedObject *self) { BOOL res = TRUE; @@ -153,6 +173,7 @@ {"GetOverlappedResult", (PyCFunction) overlapped_GetOverlappedResult, METH_O, NULL}, {"getbuffer", (PyCFunction) overlapped_getbuffer, METH_NOARGS, NULL}, + {"moredata", (PyCFunction) overlapped_moredata, METH_NOARGS, NULL}, {"cancel", (PyCFunction) overlapped_cancel, METH_NOARGS, NULL}, {NULL} }; @@ -217,6 +238,7 @@ self->read_buffer = NULL; self->pending = 0; self->completed = 0; + self->more_data = 0; memset(&self->overlapped, 0, sizeof(OVERLAPPED)); memset(&self->write_buffer, 0, sizeof(Py_buffer)); /* Manual reset, initially non-signalled */ @@ -767,6 +789,7 @@ WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE); WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES); + WIN32_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED); WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);