diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -49,7 +49,7 @@ import atexit from concurrent.futures import _base import queue import multiprocessing -from multiprocessing.queues import SimpleQueue +from multiprocessing.queues import SimpleQueue, SentinelReady import threading import weakref @@ -194,29 +194,54 @@ def _queue_manangement_worker(executor_r result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. """ - nb_shutdown_processes = 0 - def shutdown_one_process(): - """Tell a worker to terminate, which will in turn wake us again""" - nonlocal nb_shutdown_processes - call_queue.put(None) - nb_shutdown_processes += 1 + + def shutdown_worker(): + for i in range(0, len(processes)): + call_queue.put(None) + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes.values(): + p.join() + while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - result_item = result_queue.get() + sentinels = list(processes.keys()) + assert sentinels + try: + result_item = result_queue.get(sentinels=sentinels) + except SentinelReady as e: + # Clear terminated processes + for sentinel in e.sentinels: + processes.pop(sentinel).join() + # All in-flight futures must be marked failed preemptively + for work_id, work_item in list(pending_work_items.items()): + if work_item.future.running(): + work_item.future.set_exception( + ProcessExited( + "A process in the process pool was " + "terminated abruptly while the future was " + "running." + )) + del pending_work_items[work_id] + executor = executor_reference() + if executor is not None: + # Launch other workers to compensate + if not _shutdown and not executor._shutdown_thread: + executor._adjust_process_count() + del executor if result_item is not None: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - continue - # If we come here, we either got a timeout or were explicitly woken up. - # In either case, check whether we should start shutting down. + work_item = pending_work_items.pop(result_item.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: # - The interpreter is shutting down OR @@ -226,17 +251,11 @@ def _queue_manangement_worker(executor_r # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: - while nb_shutdown_processes < len(processes): - shutdown_one_process() - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() + shutdown_worker() return else: # Start shutting down by telling a process it can exit. - shutdown_one_process() + call_queue.put(None) del executor _system_limits_checked = False @@ -264,6 +283,14 @@ def _check_system_limits(): _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max raise NotImplementedError(_system_limited) + +class ProcessExited(RuntimeError): + """ + Raised when a process in a ProcessPoolExecutor terminated abruptly + while a future was in the running state. + """ + + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. @@ -288,7 +315,8 @@ class ProcessPoolExecutor(_base.Executor self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None - self._processes = set() + # Map of sentinels to processes + self._processes = {} # Shutdown is a two-step process. self._shutdown_thread = False @@ -302,6 +330,8 @@ class ProcessPoolExecutor(_base.Executor def weakref_cb(_, q=self._result_queue): q.put(None) if self._queue_management_thread is None: + # Start the processes so that their sentinels are known. + self._adjust_process_count() self._queue_management_thread = threading.Thread( target=_queue_manangement_worker, args=(weakref.ref(self, weakref_cb), @@ -319,9 +349,10 @@ class ProcessPoolExecutor(_base.Executor p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, - self._result_queue)) + self._result_queue), + create_sentinel=True) p.start() - self._processes.add(p) + self._processes[p._sentinel] = p def submit(self, fn, *args, **kwargs): with self._shutdown_lock: @@ -338,7 +369,6 @@ class ProcessPoolExecutor(_base.Executor self._result_queue.put(None) self._start_queue_management_thread() - self._adjust_process_count() return f submit.__doc__ = _base.Executor.submit.__doc__ diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -118,6 +118,15 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) + +class SentinelReady(Exception): + """ + Raised when a sentinel is ready when polling. + """ + def __init__(self, *args): + Exception.__init__(self, *args) + self.sentinels = args[0] + # # Connection classes # @@ -253,10 +262,12 @@ class _ConnectionBase: (offset + size) // itemsize]) return size - def recv(self): + def recv(self, sentinels=None): """Receive a (picklable) object""" self._check_closed() self._check_readable() + if sentinels: + self._poll(None, sentinels) buf = self._recv_bytes() return pickle.loads(buf.getbuffer()) @@ -306,7 +317,7 @@ if win32: buf.write(lastchunk) return buf - def _poll(self, timeout): + def _poll(self, timeout, sentinels=()): navail, nleft = win32.PeekNamedPipe(self._handle) if navail > 0: return True @@ -384,9 +395,14 @@ class Connection(_ConnectionBase): return None return self._recv(size) - def _poll(self, timeout): - r = select.select([self._handle], [], [], timeout)[0] - return bool(r) + def _poll(self, timeout, sentinels=[]): + handles = [self._handle] + sentinels + r = select.select(handles, [], [], timeout)[0] + if self._handle in r: + return True + if r: + raise SentinelReady(r) + return False # diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -118,8 +118,16 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + if process_obj._create_sentinel: + r, w = os.pipe() + self.sentinel = r + else: + r = w = self.sentinel = None + self.pid = os.fork() if self.pid == 0: + if r is not None: + os.close(r) if 'random' in sys.modules: import random random.seed() @@ -128,6 +136,10 @@ if sys.platform != 'win32': sys.stderr.flush() os._exit(code) + if w is not None: + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -92,7 +92,7 @@ class Process(object): _Popen = None def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, - *, daemon=None): + *, daemon=None, create_sentinel=False): assert group is None, 'group argument must be None for now' count = next(_current_process._counter) self._identity = _current_process._identity + (count,) @@ -109,6 +109,7 @@ class Process(object): self._kwargs = dict(kwargs) self._name = name or type(self).__name__ + '-' + \ ':'.join(str(i) for i in self._identity) + self._create_sentinel = create_sentinel def run(self): ''' @@ -132,6 +133,7 @@ class Process(object): else: from .forking import Popen self._popen = Popen(self) + self._sentinel = self._popen.sentinel _current_process._children.add(self) def terminate(self): diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -44,7 +44,7 @@ import weakref from queue import Empty, Full import _multiprocessing -from multiprocessing import Pipe +from multiprocessing.connection import Pipe, SentinelReady from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -372,10 +372,10 @@ class SimpleQueue(object): def _make_methods(self): recv = self._reader.recv racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(): + def get(*, sentinels=None): racquire() try: - return recv() + return recv(sentinels) finally: rrelease() self.get = get diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -154,7 +154,7 @@ class ProcessPoolShutdownTest(ProcessPoo processes = self.executor._processes self.executor.shutdown() - for p in processes: + for p in processes.values(): p.join() def test_context_manager_shutdown(self): @@ -163,7 +163,7 @@ class ProcessPoolShutdownTest(ProcessPoo self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - for p in processes: + for p in processes.values(): p.join() def test_del_shutdown(self): @@ -174,7 +174,7 @@ class ProcessPoolShutdownTest(ProcessPoo del executor queue_management_thread.join() - for p in processes: + for p in processes.values(): p.join() class WaitTests(unittest.TestCase):