diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -106,6 +106,7 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) + # # Connection classes # @@ -198,13 +199,31 @@ class _ConnectionBase: raise ValueError("buffer length < offset + size") self._send_bytes(m[offset:offset + size]) - def send(self, obj): - """Send a (picklable) object""" + def _real_send(self, obj, lock=None, lock_timeout=None): self._check_closed() self._check_writable() buf = io.BytesIO() ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) - self._send_bytes(buf.getbuffer()) + if lock is not None: + if lock_timeout is not None: + if not lock.acquire(timeout=lock_timeout): + raise TimeoutError + else: + lock.acquire() + try: + self._send_bytes(buf.getbuffer()) + finally: + lock.receive() + else: + self._send_bytes(buf.getbuffer()) + + def send(self, obj): + """Send a (picklable) object""" + self._real_send(obj) + + def locked_send(self, obj, lock, lock_timeout=None): + """Send a (picklable) object""" + self._real_send(obj, lock, lock_timeout) def recv_bytes(self, maxlength=None): """ @@ -244,12 +263,29 @@ class _ConnectionBase: (offset + size) // itemsize]) return size + def _real_recv(self, lock=None, lock_timeout=None): + self._check_closed() + self._check_readable() + if lock is not None: + if lock_timeout is not None: + if not lock.acquire(timeout=lock_timeout): + raise TimeoutError + else: + lock.acquire() + try: + buf = self._recv_bytes() + finally: + lock.release() + else: + buf = self._recv_bytes() + return pickle.loads(buf.getbuffer()) + def recv(self): """Receive a (picklable) object""" - self._check_closed() - self._check_readable() - buf = self._recv_bytes() - return pickle.loads(buf.getbuffer()) + return self._real_recv() + + def locked_recv(self, lock, lock_timeout=None): + return self._real_recv(lock, lock_timeout) def poll(self, timeout=0.0): """Whether there is any input available to be read""" diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -69,8 +69,8 @@ class Queue(object): self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send - self._recv = self._reader.recv + self._send = self._writer.locked_send + self._recv = self._reader.locked_recv self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): @@ -89,31 +89,29 @@ class Queue(object): def get(self, block=True, timeout=None): if block and timeout is None: - self._rlock.acquire() - try: - res = self._recv() - self._sem.release() - return res - finally: - self._rlock.release() - + res = self._recv(self._rlock) + self._sem.release() + return res else: if block: deadline = time.time() + timeout - if not self._rlock.acquire(block, timeout): + if not self._poll(timeout): + raise Empty + elif not self._poll(): raise Empty + if block: + lock_timeout = deadline - time.time() + if lock_timeout < 0: + raise Empty + else: + lock_timeout = 0 try: - if block: - timeout = deadline - time.time() - if timeout < 0 or not self._poll(timeout): - raise Empty - elif not self._poll(): - raise Empty - res = self._recv() + res = self._recv(self._rlock, lock_timeout) + except TimeoutError: + raise Empty + else: self._sem.release() return res - finally: - self._rlock.release() def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -210,7 +208,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close, ignore_epipe): + def _feed(buffer, notempty, locked_send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -219,11 +217,6 @@ class Queue(object): nwait = notempty.wait bpopleft = buffer.popleft sentinel = _sentinel - if sys.platform != 'win32': - wacquire = writelock.acquire - wrelease = writelock.release - else: - wacquire = None try: while 1: @@ -241,18 +234,12 @@ class Queue(object): close() return - if wacquire is None: - send(obj) - # Delete references to object. See issue16284 - del obj + if writelock is None: + locked_send(obj) else: - wacquire() - try: - send(obj) - # Delete references to object. See issue16284 - del obj - finally: - wrelease() + locked_send(obj, writelock) + # Delete references to object. See issue16284 + del obj except IndexError: pass except Exception as e: