# HG changeset patch # Parent 80f814dca274b5d848dbd306c1513263e69011ce Issue 10886, 8037: make multiprocessing's Queue.put pickle immediately The patch makes multiprocessing's Queue.put() pickle its argument immediately. This makes any pickling error occur immediately in the original thread. With the current implementation, an error will be raised later on in a background thread. This makes debugging unnecessarily difficult. The patch also has the advantage of ensuring that weakref callbacks and __del__ methods for objects put in the queue will not be run in the background thread. (Bytes objects have trivial destructors.) This potentially prevents inconsistent state caused by forking a process while the background thread is running -- see Issue 6721. This is basically a version of the patch for issue 8037, but it is updated for the default mercurial branch, and does not modify the put() method. diff -r 80f814dca274 Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py Thu Aug 25 18:32:54 2011 +0200 +++ b/Lib/multiprocessing/queues.py Mon Aug 29 17:00:25 2011 +0100 @@ -44,6 +44,7 @@ import errno from queue import Empty, Full +from pickle import dumps, HIGHEST_PROTOCOL import _multiprocessing from multiprocessing.connection import Pipe, SentinelReady from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition @@ -95,7 +96,7 @@ self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send + self._send_bytes = self._writer.send_bytes self._recv = self._reader.recv self._poll = self._reader.poll @@ -108,7 +109,7 @@ try: if self._thread is None: self._start_thread() - self._buffer.append(obj) + self._buffer.append(dumps(obj, HIGHEST_PROTOCOL)) self._notempty.notify() finally: self._notempty.release() @@ -180,7 +181,7 @@ self._buffer.clear() self._thread = threading.Thread( target=Queue._feed, - args=(self._buffer, self._notempty, self._send, + args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) @@ -232,7 +233,7 @@ notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -264,11 +265,11 @@ return if wacquire is None: - send(obj) + send_bytes(obj) else: wacquire() try: - send(obj) + send_bytes(obj) finally: wrelease() except IndexError: @@ -323,7 +324,7 @@ try: if self._thread is None: self._start_thread() - self._buffer.append(obj) + self._buffer.append(dumps(obj, HIGHEST_PROTOCOL)) self._unfinished_tasks.release() self._notempty.notify() finally: diff -r 80f814dca274 Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Thu Aug 25 18:32:54 2011 +0200 +++ b/Lib/test/test_multiprocessing.py Mon Aug 29 17:00:25 2011 +0100 @@ -618,6 +618,36 @@ p.join() # +# Issue 8037, 8323, 10886 +# Checks that putting an unpicklable object in a queue will raise an error +# immediately (rather than delaying till the object is flushed to the pipe). +# Also checks that modifying the object after it has been put on the queue +# does not affect what is received by the other process. +# + +class _TestQueuePutFailImmediately(BaseTestCase): + + ALLOWED_TYPES = ('processes',) + + def _test_queue_put(self, q): + l = list(range(10000)) + q.put(l) + l[-1] = None + + def test_queue_put(self): + for qClass in (multiprocessing.Queue, multiprocessing.JoinableQueue): + q = qClass() + class Unpicklable(object): + def __reduce__(self): + raise TypeError + self.assertRaises(TypeError, q.put, Unpicklable()) + p = multiprocessing.Process(target = self._test_queue_put, + args = (q,)) + p.start() + self.assertEqual(q.get()[-1], 9999) + p.join() + +# # #