diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index a290181487..740fe7e01d 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -17,6 +17,7 @@ import types import weakref import errno +import contextlib from queue import Empty, Full @@ -28,6 +29,19 @@ from .util import debug, info, Finalize, register_after_fork, is_exiting + +class CleanExchange: + def __init__(self, obj, attr): + self.obj = obj + self.attr = attr + + def exchange(self): + result = getattr(self.obj, self.attr) + setattr(self.obj, self.attr, None) + self.obj = None + self.attr = None + return result + # # Queue type using a pipe, buffer and thread # @@ -78,7 +92,8 @@ def _reset(self, after_fork=False): self._jointhread = None self._joincancelled = False self._closed = False - self._close = None + self._close = self._writer.close + self._sentinel_close = None self._send_bytes = self._writer.send_bytes self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll @@ -146,6 +161,10 @@ def close(self): if close: self._close = None close() + sentinel_close = self._sentinel_close + if sentinel_close: + self._sentinel_close = None + sentinel_close() def join_thread(self): debug('Queue.join_thread()') @@ -166,11 +185,13 @@ def _start_thread(self): # Start thread which transfers data from buffer to pipe self._buffer.clear() + self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe, - self._on_queue_feeder_error, self._sem), + #self._wlock, self._close, self._ignore_epipe, + self._wlock, CleanExchange(self, "_close"), + self._ignore_epipe, self._on_queue_feeder_error, self._sem), name='QueueFeederThread' ) self._thread.daemon = True @@ -187,7 +208,7 @@ def _start_thread(self): ) # Send sentinel to the thread queue object when garbage collected - self._close = Finalize( + self._sentinel_close = Finalize( self, Queue._finalize_close, [self._buffer, self._notempty], exitpriority=10 @@ -211,8 +232,8 @@ def _finalize_close(buffer, notempty): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, - onerror, queue_sem): + def _feed(buffer, notempty, send_bytes, writelock, + close_exchanger_callback, ignore_epipe, onerror, queue_sem): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -225,52 +246,60 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, else: wacquire = None - while 1: + @contextlib.contextmanager + def manager(close_exchanger): + close = close_exchanger_callback.exchange() try: - nacquire() - try: - if not buffer: - nwait() - finally: - nrelease() + yield close + finally: + close() + + with manager(close_exchanger_callback) as close: + while 1: try: - while 1: - obj = bpopleft() - if obj is sentinel: - debug('feeder thread got sentinel -- exiting') - close() - return - - # serialize the data before acquiring the lock - obj = _ForkingPickler.dumps(obj) - if wacquire is None: - send_bytes(obj) - else: - wacquire() - try: + nacquire() + try: + if not buffer: + nwait() + finally: + nrelease() + try: + while 1: + obj = bpopleft() + if obj is sentinel: + debug('feeder thread got sentinel -- exiting') + return + + # serialize the data before acquiring the lock + obj = _ForkingPickler.dumps(obj) + if wacquire is None: send_bytes(obj) - finally: - wrelease() - except IndexError: - pass - except Exception as e: - if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: - return - # Since this runs in a daemon thread the resources it uses - # may be become unusable while the process is cleaning up. - # We ignore errors which happen after the process has - # started to cleanup. - if is_exiting(): - info('error in queue thread: %s', e) - return - else: - # Since the object has not been sent in the queue, we need - # to decrease the size of the queue. The error acts as - # if the object had been silently removed from the queue - # and this step is necessary to have a properly working - # queue. - queue_sem.release() - onerror(e, obj) + else: + wacquire() + try: + send_bytes(obj) + finally: + wrelease() + except IndexError: + pass + except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return + # Since this runs in a daemon thread the resources it uses + # may be become unusable while the process is cleaning up. + # We ignore errors which happen after the process has + # started to cleanup. + if is_exiting(): + info('error in queue thread: %s', e) + return + else: + # Since the object has not been sent in the queue, we + # need to decrease the size of the queue. The error + # acts as if the object had been silently removed from + # the queue and this step is necessary to have a + # properly working queue. + queue_sem.release() + onerror(e, obj) @staticmethod def _on_queue_feeder_error(e, obj): diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index fd3b4303f0..5ca969c2ea 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1224,6 +1224,21 @@ def test_closed_queue_put_get_exceptions(self): q.put('foo') with self.assertRaisesRegex(ValueError, 'is closed'): q.get() + + def test_closed_queue_closes_both(self): + q = multiprocessing.Queue() + q.put(1) + q.get() + q.close() + q.join_thread() + self.assertTrue(q._reader.closed) + self.assertTrue(q._writer.closed) + + q = multiprocessing.Queue() + q.close() + q.join_thread() + self.assertTrue(q._reader.closed) + self.assertTrue(q._writer.closed) # # #