Index: Lib/multiprocessing/queues.py =================================================================== --- Lib/multiprocessing/queues.py (revision 78745) +++ Lib/multiprocessing/queues.py (working copy) @@ -16,6 +16,7 @@ import atexit import weakref +from cPickle import dumps, loads, HIGHEST_PROTOCOL from Queue import Empty, Full import _multiprocessing from multiprocessing import Pipe @@ -66,8 +67,8 @@ self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send - self._recv = self._reader.recv + self._send = self._writer.send_bytes + self._recv = self._reader.recv_bytes self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): @@ -79,20 +80,22 @@ try: if self._thread is None: self._start_thread() - self._buffer.append(obj) + self._buffer.append(dumps(obj, HIGHEST_PROTOCOL)) self._notempty.notify() finally: self._notempty.release() def get(self, block=True, timeout=None): + pickledRes = None if block and timeout is None: self._rlock.acquire() try: - res = self._recv() + pickledRes = self._recv() self._sem.release() - return res finally: self._rlock.release() + if pickledRes: + return loads(pickledRes) else: if block: @@ -102,11 +105,12 @@ try: if not self._poll(block and (deadline-time.time()) or 0.0): raise Empty - res = self._recv() + pickledRes = self._recv() self._sem.release() - return res finally: self._rlock.release() + if pickledRes: + return loads(pickledRes) def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -292,7 +296,7 @@ try: if self._thread is None: self._start_thread() - self._buffer.append(obj) + self._buffer.append(dumps(obj, HIGHEST_PROTOCOL)) self._unfinished_tasks.release() self._notempty.notify() finally: Index: Lib/test/test_multiprocessing.py =================================================================== --- Lib/test/test_multiprocessing.py (revision 78745) +++ Lib/test/test_multiprocessing.py (working copy) @@ -1898,8 +1898,24 @@ flike.flush() assert sio.getvalue() == 'foo' -testcases_other = [OtherTest, TestInvalidHandle, TestStdinBadfiledescriptor] +# Issue 8037 +# Not part of _TestQueue since this test only applies to the multiprocessing +# queues. (Via multiprocessing.dummy, _TestQueue also tests Queue.Queue for +# which this test is invalid.) +class TestQueuePutAtomicity(unittest.TestCase): + + def test_queue_put_atomicity(self): + for queueClass in (multiprocessing.Queue, multiprocessing.JoinableQueue): + l = range(10000) + q = queueClass() + q.put(l) + l[-1] = None + self.assertEqual(q.get()[-1], 9999) + + +testcases_other = [OtherTest, TestInvalidHandle, TestStdinBadfiledescriptor, + TestQueuePutAtomicity] # # #