diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -16,6 +16,7 @@ import time import weakref import errno +import pickle from queue import Empty, Full import _multiprocessing @@ -69,8 +70,8 @@ self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send - self._recv = self._reader.recv + self._send_bytes = self._writer.send_bytes + self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): @@ -89,14 +90,9 @@ def get(self, block=True, timeout=None): if block and timeout is None: - self._rlock.acquire() - try: - res = self._recv() - self._sem.release() - return res - finally: - self._rlock.release() - + with self._rlock: + res = self._recv_bytes() + self._sem.release() else: if block: deadline = time.time() + timeout @@ -109,11 +105,12 @@ raise Empty elif not self._poll(): raise Empty - res = self._recv() + res = self._recv_bytes() self._sem.release() - return res finally: self._rlock.release() + # unserialize the data after having released the lock + return pickle.loads(res) def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -158,7 +155,7 @@ self._buffer.clear() self._thread = threading.Thread( target=Queue._feed, - args=(self._buffer, self._notempty, self._send, + args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) @@ -210,7 +207,7 @@ notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -241,14 +238,16 @@ close() return + # serialize the data before acquiring the lock + obj = pickle.dumps(obj) if wacquire is None: - send(obj) + send_bytes(obj) # Delete references to object. See issue16284 del obj else: wacquire() try: - send(obj) + send_bytes(obj) # Delete references to object. See issue16284 del obj finally: @@ -344,7 +343,6 @@ self._wlock = None else: self._wlock = Lock() - self._make_methods() def empty(self): return not self._poll() @@ -355,29 +353,19 @@ def __setstate__(self, state): (self._reader, self._writer, self._rlock, self._wlock) = state - self._make_methods() - def _make_methods(self): - recv = self._reader.recv - racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(): - racquire() - try: - return recv() - finally: - rrelease() - self.get = get + def get(self): + with self._rlock: + res = self._reader.recv_bytes() + # unserialize the data after having released the lock + return pickle.loads(res) + def put(self, obj): + # serialize the data before acquiring the lock + obj = pickle.dumps(obj) if self._wlock is None: # writes to a message oriented win32 pipe are atomic - self.put = self._writer.send + self._writer.send_bytes(obj) else: - send = self._writer.send - wacquire, wrelease = self._wlock.acquire, self._wlock.release - def put(obj): - wacquire() - try: - return send(obj) - finally: - wrelease() - self.put = put + with self._wlock: + self._writer.send_bytes(obj)