# HG changeset patch # Parent 95a35cef2aacbe2298ffc4ad5ff3fbaff75da810 Issue #12328: Fixes for multiprocessing's overlapped I/O on Windows The patch addresses issues related to PipeConnection.poll() and fixes PipeListener.accept() to call ConnectNamedPipe() with overlapped=True. It also fixes Queue.empty() so that it is threadsafe on Windows. In combination with sigint_event.patch, all of multiprocesing's pipe related blocking functions/methods are now interruptible on Windows. diff -r 95a35cef2aac Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py Sat Nov 19 22:01:17 2011 +0000 +++ b/Lib/multiprocessing/connection.py Sun Nov 20 18:26:39 2011 +0000 @@ -290,85 +290,86 @@ Overlapped I/O is used, so the handles must have been created with FILE_FLAG_OVERLAPPED. """ - _buffered = b'' + _message = None def _close(self, _CloseHandle=win32.CloseHandle): _CloseHandle(self._handle) def _send_bytes(self, buf): - overlapped = win32.WriteFile(self._handle, buf, overlapped=True) - nwritten, complete = overlapped.GetOverlappedResult(True) + ov = win32.WriteFile(self._handle, buf, overlapped=True) + try: + win32.WaitForMultipleObjects([ov.event], False, INFINITE) + finally: + ov.cancel() + nwritten, complete = ov.GetOverlappedResult(True) assert complete assert nwritten == len(buf) def _recv_bytes(self, maxsize=None, sentinels=()): - if sentinels: - self._poll(-1.0, sentinels) - buf = io.BytesIO() - firstchunk = self._buffered - if firstchunk: - lenfirstchunk = len(firstchunk) - buf.write(firstchunk) - self._buffered = b'' - else: - # A reasonable size for the first chunk transfer - bufsize = 128 - if maxsize is not None and maxsize < bufsize: - bufsize = maxsize + if self._message is None: + bsize = 128 if maxsize is None else min(maxsize, 128) try: - overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True) - lenfirstchunk, complete = overlapped.GetOverlappedResult(True) - firstchunk = overlapped.getbuffer() - assert lenfirstchunk == len(firstchunk) + ov = win32.ReadFile(self._handle, 128, overlapped=True) + try: + handles = [ov.event] + if sentinels: + handles += sentinels + waitres = win32.WaitForMultipleObjects(handles, False, + INFINITE) + finally: + ov.cancel() + self._message = self._get_message(ov, maxsize) except IOError as e: if e.winerror == win32.ERROR_BROKEN_PIPE: raise EOFError - raise - buf.write(firstchunk) - if complete: - return buf - navail, nleft = win32.PeekNamedPipe(self._handle) - if maxsize is not None and lenfirstchunk + nleft > maxsize: + if self._message is None: + assert 0 <= waitres - 1 - WAIT_OBJECT_0 < len(sentinels) + raise SentinelReady(sentinels[waitres - 1 - WAIT_OBJECT_0]) + tmp, self._message = self._message, None + res = io.BytesIO() + res.write(tmp) + return res + + def _poll(self, timeout, maxsize=None): + if self._message is not None: + return True + if win32.PeekNamedPipe(self._handle)[0] != 0: + return True + msecs = INFINITE if timeout < 0.0 else int(timeout * 1000 + 0.5) + try: + ov = win32.ReadFile(self._handle, 128, overlapped=True) + try: + win32.WaitForMultipleObjects([ov.event], False, msecs) + finally: + ov.cancel() + self._message = self._get_message(ov, maxsize) + except IOError as e: + if e.winerror == win32.ERROR_BROKEN_PIPE: + return True + return self._message is not None + + def _poll_and_not_empty_string(self): + return win32.PeekNamedPipe(self._handle)[0] != 0 + + def _get_message(self, ov, maxsize): + nread, complete = ov.GetOverlappedResult(True) + if complete: + return ov.getbuffer() + err = win32.GetLastError() + if err == win32.ERROR_OPERATION_ABORTED: + assert nread == 0 return None - if nleft > 0: - overlapped = win32.ReadFile(self._handle, nleft, overlapped=True) - res, complete = overlapped.GetOverlappedResult(True) - assert res == nleft - assert complete - buf.write(overlapped.getbuffer()) - return buf - - def _poll(self, timeout, sentinels=()): - # Fast non-blocking path - navail, nleft = win32.PeekNamedPipe(self._handle) - if navail > 0: - return True - elif timeout == 0.0: - return False - # Blocking: use overlapped I/O - if timeout < 0.0: - timeout = INFINITE - else: - timeout = int(timeout * 1000 + 0.5) - overlapped = win32.ReadFile(self._handle, 1, overlapped=True) - try: - handles = [overlapped.event] - handles += sentinels - res = win32.WaitForMultipleObjects(handles, False, timeout) - finally: - # Always cancel overlapped I/O in the same thread - # (because CancelIoEx() appears only in Vista) - overlapped.cancel() - if res == WAIT_TIMEOUT: - return False - idx = res - WAIT_OBJECT_0 - if idx == 0: - # I/O was successful, store received data - overlapped.GetOverlappedResult(True) - self._buffered += overlapped.getbuffer() - return True - assert 0 < idx < len(handles) - raise SentinelReady([handles[idx]]) + assert err == win32.ERROR_MORE_DATA + buf = ov.getbuffer() + left = win32.PeekNamedPipe(self._handle)[1] + assert left > 0 + if maxsize is not None and len(buf) + left > maxsize: + self._bad_message_length() + ov = win32.ReadFile(self._handle, left, overlapped=True) + rbytes, complete = ov.GetOverlappedResult(True) + assert complete + assert rbytes == left + return buf + ov.getbuffer() class Connection(_ConnectionBase): @@ -639,38 +640,37 @@ ''' def __init__(self, address, backlog=None): self._address = address - handle = win32.CreateNamedPipe( - address, win32.PIPE_ACCESS_DUPLEX, + self._handle_queue = [self._new_handle()] + self._last_accepted = None + sub_debug('listener created with address=%r', self._address) + self.close = Finalize( + self, PipeListener._finalize_pipe_listener, + args=(self._handle_queue, self._address), exitpriority=0 + ) + + def _new_handle(self): + return win32.CreateNamedPipe( + self._address, + win32.PIPE_ACCESS_DUPLEX | win32.FILE_FLAG_OVERLAPPED, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, win32.NMPWAIT_WAIT_FOREVER, win32.NULL ) - self._handle_queue = [handle] - self._last_accepted = None - - sub_debug('listener created with address=%r', self._address) - - self.close = Finalize( - self, PipeListener._finalize_pipe_listener, - args=(self._handle_queue, self._address), exitpriority=0 - ) def accept(self): - newhandle = win32.CreateNamedPipe( - self._address, win32.PIPE_ACCESS_DUPLEX, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL - ) - self._handle_queue.append(newhandle) + self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) + ov = win32.ConnectNamedPipe(handle, overlapped=True) try: - win32.ConnectNamedPipe(handle, win32.NULL) - except WindowsError as e: - if e.winerror != win32.ERROR_PIPE_CONNECTED: - raise + res = win32.WaitForMultipleObjects([ov.event], False, INFINITE) + except: + ov.cancel() + win32.CloseHandle(handle) + raise + finally: + _, complete = ov.GetOverlappedResult(True) + assert complete return PipeConnection(handle) @staticmethod @@ -689,7 +689,8 @@ win32.WaitNamedPipe(address, 1000) h = win32.CreateFile( address, win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + 0, win32.NULL, win32.OPEN_EXISTING, + win32.FILE_FLAG_OVERLAPPED, win32.NULL ) except WindowsError as e: if e.winerror not in (win32.ERROR_SEM_TIMEOUT, diff -r 95a35cef2aac Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py Sat Nov 19 22:01:17 2011 +0000 +++ b/Lib/multiprocessing/queues.py Sun Nov 20 18:26:39 2011 +0000 @@ -97,6 +97,14 @@ self._send = self._writer.send self._recv = self._reader.recv self._poll = self._reader.poll + if sys.platform != 'win32': + self._safe_poll = self._poll + else: + # PipeConnection.poll() is not thread-safe. Since the + # queue will never contain empty byte-string messages we + # can safely use a simpler alternative which is just a + # wrapper for PeekNamedPipe(). + self._safe_poll = self._reader._poll_and_not_empty_string def put(self, obj, block=True, timeout=None): assert not self._closed @@ -145,7 +153,7 @@ return self._maxsize - self._sem._semlock._get_value() def empty(self): - return not self._poll() + return not self._safe_poll() def full(self): return self._sem._semlock._is_zero() diff -r 95a35cef2aac Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Sat Nov 19 22:01:17 2011 +0000 +++ b/Lib/test/test_multiprocessing.py Sun Nov 20 18:26:39 2011 +0000 @@ -1742,6 +1742,84 @@ self.assertEqual(conn.recv(), 'hello') p.join() l.close() + +class _TestPoll(unittest.TestCase): + + ALLOWED_TYPES = ('processes', 'threads') + + def test_empty_string(self): + a, b = self.Pipe() + self.assertEqual(a.poll(), False) + b.send_bytes(b'') + self.assertEqual(a.poll(), True) + self.assertEqual(a.poll(), True) + + @classmethod + def _child_strings(cls, conn, strings): + for s in strings: + time.sleep(0.1) + conn.send_bytes(s) + conn.close() + + def test_strings(self): + strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') + a, b = self.Pipe() + p = self.Process(target=self._child_strings, args=(b, strings)) + p.start() + + for s in strings: + for i in range(200): + if a.poll(0.01): + break + self.assertEqual(s, a.recv_bytes()) + + p.join() + + @classmethod + def _child_boundaries(cls, r): + # Polling may "pull" a message in to the child process, but we + # don't want it to pull only part of a message, as that would + # corrupt the pipe for any other processes which might later + # read from it. + r.poll(5) + + def test_boundaries(self): + r, w = self.Pipe(False) + p = self.Process(target=self._child_boundaries, args=(r,)) + p.start() + time.sleep(2) + L = [b"first", b"second"] + for obj in L: + w.send_bytes(obj) + w.close() + p.join() + self.assertIn(r.recv_bytes(), L) + + @classmethod + def _child_dont_merge(cls, b): + b.send_bytes(b'a') + b.send_bytes(b'b') + b.send_bytes(b'cd') + + def test_dont_merge(self): + a, b = self.Pipe() + self.assertEqual(a.poll(0.0), False) + self.assertEqual(a.poll(0.1), False) + + p = self.Process(target=self._child_dont_merge, args=(b,)) + p.start() + + self.assertEqual(a.recv_bytes(), b'a') + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.recv_bytes(), b'b') + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(0.0), True) + self.assertEqual(a.recv_bytes(), b'cd') + + p.join() + # # Test of sending connection and socket objects between processes # diff -r 95a35cef2aac Modules/_multiprocessing/win32_functions.c --- a/Modules/_multiprocessing/win32_functions.c Sat Nov 19 22:01:17 2011 +0000 +++ b/Modules/_multiprocessing/win32_functions.c Sun Nov 20 18:26:39 2011 +0000 @@ -60,16 +60,18 @@ static void overlapped_dealloc(OverlappedObject *self) { + DWORD bytes; int err = GetLastError(); if (self->pending) { - if (check_CancelIoEx()) - Py_CancelIoEx(self->handle, &self->overlapped); - else { - PyErr_SetString(PyExc_RuntimeError, - "I/O operations still in flight while destroying " - "Overlapped object, the process may crash"); - PyErr_WriteUnraisable(NULL); - } + /* make it a programming error to deallocate while operation + is pending, even if we can safely cancel it */ + if (check_CancelIoEx() && + Py_CancelIoEx(self->handle, &self->overlapped)) + GetOverlappedResult(self->handle, &self->overlapped, &bytes, TRUE); + PyErr_SetString(PyExc_RuntimeError, + "I/O operations still in flight while destroying " + "Overlapped object, the process may crash"); + PyErr_WriteUnraisable(NULL); } CloseHandle(self->overlapped.hEvent); SetLastError(err); @@ -85,6 +87,7 @@ int wait; BOOL res; DWORD transferred = 0; + DWORD err; wait = PyObject_IsTrue(waitobj); if (wait < 0) @@ -94,18 +97,21 @@ wait != 0); Py_END_ALLOW_THREADS - if (!res) { - int err = GetLastError(); - if (err == ERROR_IO_INCOMPLETE) - Py_RETURN_NONE; - if (err != ERROR_MORE_DATA) { + err = res ? ERROR_SUCCESS : GetLastError(); + switch (err) { + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_OPERATION_ABORTED: + self->completed = 1; + self->pending = 0; + break; + case ERROR_IO_INCOMPLETE: + break; + default: self->pending = 0; return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); - } } - self->pending = 0; - self->completed = 1; - if (self->read_buffer) { + if (self->completed && self->read_buffer != NULL) { assert(PyBytes_CheckExact(self->read_buffer)); if (_PyBytes_Resize(&self->read_buffer, transferred)) return NULL; @@ -780,7 +786,9 @@ WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE); + WIN32_CONSTANT(F_DWORD, ERROR_MORE_DATA); WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES); + WIN32_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED); WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);