diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -12,7 +12,6 @@ import io import os import sys -import pickle import select import socket import struct @@ -202,9 +201,7 @@ """Send a (picklable) object""" self._check_closed() self._check_writable() - buf = io.BytesIO() - ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) - self._send_bytes(buf.getbuffer()) + self._send_bytes(ForkingPickler.dumps(obj)) def recv_bytes(self, maxlength=None): """ @@ -249,7 +246,7 @@ self._check_closed() self._check_readable() buf = self._recv_bytes() - return pickle.loads(buf.getbuffer()) + return ForkingPickler.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -7,7 +7,9 @@ # Licensed to PSF under a Contributor Agreement. # +import io import os +import pickle import sys import signal @@ -43,6 +45,15 @@ def register(cls, type, reduce): cls._extra_reducers[type] = reduce + @staticmethod + def dumps(obj): + buf = io.BytesIO() + ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) + return buf.getbuffer() + + loads = pickle.loads + + def _reduce_method(m): if m.__self__ is None: return getattr, (m.__class__, m.__func__.__name__) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -22,7 +22,7 @@ from multiprocessing.connection import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork -from multiprocessing.forking import assert_spawning +from multiprocessing.forking import assert_spawning, ForkingPickler # # Queue type using a pipe, buffer and thread @@ -69,8 +69,8 @@ self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send - self._recv = self._reader.recv + self._send_bytes = self._writer.send_bytes + self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): @@ -89,14 +89,9 @@ def get(self, block=True, timeout=None): if block and timeout is None: - self._rlock.acquire() - try: - res = self._recv() - self._sem.release() - return res - finally: - self._rlock.release() - + with self._rlock: + res = self._recv_bytes() + self._sem.release() else: if block: deadline = time.time() + timeout @@ -109,11 +104,12 @@ raise Empty elif not self._poll(): raise Empty - res = self._recv() + res = self._recv_bytes() self._sem.release() - return res finally: self._rlock.release() + # unserialize the data after having released the lock + return ForkingPickler.loads(res) def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -158,7 +154,7 @@ self._buffer.clear() self._thread = threading.Thread( target=Queue._feed, - args=(self._buffer, self._notempty, self._send, + args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) @@ -210,7 +206,7 @@ notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -241,14 +237,16 @@ close() return + # serialize the data before acquiring the lock + obj = ForkingPickler.dumps(obj) if wacquire is None: - send(obj) + send_bytes(obj) # Delete references to object. See issue16284 del obj else: wacquire() try: - send(obj) + send_bytes(obj) # Delete references to object. See issue16284 del obj finally: @@ -344,7 +342,6 @@ self._wlock = None else: self._wlock = Lock() - self._make_methods() def empty(self): return not self._poll() @@ -355,29 +352,19 @@ def __setstate__(self, state): (self._reader, self._writer, self._rlock, self._wlock) = state - self._make_methods() - def _make_methods(self): - recv = self._reader.recv - racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(): - racquire() - try: - return recv() - finally: - rrelease() - self.get = get + def get(self): + with self._rlock: + res = self._reader.recv_bytes() + # unserialize the data after having released the lock + return ForkingPickler.loads(res) + def put(self, obj): + # serialize the data before acquiring the lock + obj = ForkingPickler.dumps(obj) if self._wlock is None: # writes to a message oriented win32 pipe are atomic - self.put = self._writer.send + self._writer.send_bytes(obj) else: - send = self._writer.send - wacquire, wrelease = self._wlock.acquire, self._wlock.release - def put(obj): - wacquire() - try: - return send(obj) - finally: - wrelease() - self.put = put + with self._wlock: + self._writer.send_bytes(obj)