Index: Lib/multiprocessing/queues.py =================================================================== --- Lib/multiprocessing/queues.py (revision 86129) +++ Lib/multiprocessing/queues.py (working copy) @@ -16,6 +16,7 @@ import atexit import weakref +from cPickle import dumps, loads, HIGHEST_PROTOCOL from Queue import Empty, Full import _multiprocessing from multiprocessing import Pipe @@ -66,8 +67,8 @@ self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send - self._recv = self._reader.recv + self._send = self._writer.send_bytes + self._recv = self._reader.recv_bytes self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): @@ -79,20 +80,22 @@ try: if self._thread is None: self._start_thread() - self._buffer.append(obj) + self._buffer.append(dumps(obj, HIGHEST_PROTOCOL)) self._notempty.notify() finally: self._notempty.release() def get(self, block=True, timeout=None): + pickledRes = None if block and timeout is None: self._rlock.acquire() try: - res = self._recv() + pickledRes = self._recv() self._sem.release() - return res finally: self._rlock.release() + if pickledRes: + return loads(pickledRes) else: if block: @@ -102,11 +105,12 @@ try: if not self._poll(block and (deadline-time.time()) or 0.0): raise Empty - res = self._recv() + pickledRes = self._recv() self._sem.release() - return res finally: self._rlock.release() + if pickledRes: + return loads(pickledRes) def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -292,7 +296,7 @@ try: if self._thread is None: self._start_thread() - self._buffer.append(obj) + self._buffer.append(dumps(obj, HIGHEST_PROTOCOL)) self._unfinished_tasks.release() self._notempty.notify() finally: Index: Lib/test/test_multiprocessing.py =================================================================== --- Lib/test/test_multiprocessing.py (revision 86129) +++ Lib/test/test_multiprocessing.py (working copy) @@ -1998,8 +1998,30 @@ flike.flush() assert sio.getvalue() == 'foo' +# +# Issue 8037 +# Not part of _TestQueue since this test only applies to the multiprocessing +# queues. (Via multiprocessing.dummy, _TestQueue also tests Queue.Queue for +# which this test is invalid.) +# +class TestQueuePutAtomicity(unittest.TestCase): + + def _test_queue_put(self, q): + l = list(xrange(10000)) + q.put(l) + l[-1] = None + + def test_queue_put(self): + for qClass in (multiprocessing.Queue, multiprocessing.JoinableQueue): + q = qClass() + p = multiprocessing.Process(target = self._test_queue_put, + args = (q,)) + p.start() + self.assertEqual(q.get()[-1], 9999) + p.join() + testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, - TestStdinBadfiledescriptor] + TestStdinBadfiledescriptor, TestQueuePutAtomicity] # #