| --- a/Lib/concurrent/futures/process.py |
| +++ b/Lib/concurrent/futures/process.py |
| @@ -46,10 +46,11 @@ Process #1..n: |
| __author__ = 'Brian Quinlan (brian@sweetapp.com)' |
| import atexit |
| +import os |
| from concurrent.futures import _base |
| import queue |
| import multiprocessing |
| -from multiprocessing.queues import SimpleQueue |
| +from multiprocessing.queues import SimpleQueue, SentinelReady |
| import threading |
| import weakref |
| @@ -122,7 +123,7 @@ def _process_worker(call_queue, result_q |
| call_item = call_queue.get(block=True) |
| if call_item is None: |
| # Wake up queue management thread |
| - result_queue.put(None) |
| + result_queue.put(os.getpid()) |
| return |
| try: |
| r = call_item.fn(*call_item.args, **call_item.kwargs) |
| @@ -194,29 +195,63 @@ def _queue_management_worker(executor_re |
| result_queue: A multiprocessing.Queue of _ResultItems generated by the |
| process workers. |
| """ |
| - nb_shutdown_processes = 0 |
| - def shutdown_one_process(): |
| - """Tell a worker to terminate, which will in turn wake us again""" |
| - nonlocal nb_shutdown_processes |
| - call_queue.put(None) |
| - nb_shutdown_processes += 1 |
| + |
| + def shutdown_worker(): |
| + # This is an upper bound |
| + nb_children_alive = sum(p.is_alive() for p in processes.values()) |
|
bquinlan
2011/06/08 12:31:32
nb => num
just to be consistent with the style of
|
| + for i in range(0, nb_children_alive): |
| + call_queue.put(None) |
| + # If .join() is not called on the created processes then |
| + # some multiprocessing.Queue methods may deadlock on Mac OS |
| + # X. |
| + for p in processes.values(): |
| + p.join() |
| + |
| while True: |
| _add_call_item_to_queue(pending_work_items, |
| work_ids_queue, |
| call_queue) |
| - result_item = result_queue.get() |
| - if result_item is not None: |
| - work_item = pending_work_items[result_item.work_id] |
| - del pending_work_items[result_item.work_id] |
| - |
| - if result_item.exception: |
| - work_item.future.set_exception(result_item.exception) |
| - else: |
| - work_item.future.set_result(result_item.result) |
| - continue |
| - # If we come here, we either got a timeout or were explicitly woken up. |
| - # In either case, check whether we should start shutting down. |
| + sentinels = [p.sentinel for p in processes.values()] |
| + assert sentinels |
| + try: |
| + result_item = result_queue.get(sentinels=sentinels) |
| + except SentinelReady as e: |
| + # Mark the process pool broken so that submits fail right now. |
| + executor = executor_reference() |
| + if executor is not None: |
| + executor._broken = True |
| + executor._shutdown_thread = True |
| + del executor |
| + # All futures in flight must be marked failed |
| + for work_id, work_item in pending_work_items.items(): |
| + work_item.future.set_exception( |
| + BrokenProcessPool( |
| + "A process in the process pool was " |
| + "terminated abruptly while the future was " |
| + "running or pending." |
| + )) |
| + pending_work_items.clear() |
| + # Terminate remaining workers forcibly: the queues or their |
| + # locks may be in a dirty state and block forever. |
| + for p in processes.values(): |
| + p.terminate() |
| + for p in processes.values(): |
| + p.join() |
| + return |
| + if isinstance(result_item, int): |
| + # Clean shutdown of a worker using its PID |
| + # (avoids marking the executor broken) |
| + del processes[result_item] |
| + elif result_item is not None: |
| + work_item = pending_work_items.pop(result_item.work_id, None) |
| + # work_item can be None if another process terminated (see above) |
| + if work_item is not None: |
| + if result_item.exception: |
| + work_item.future.set_exception(result_item.exception) |
| + else: |
| + work_item.future.set_result(result_item.result) |
| + # Check whether we should start shutting down. |
| executor = executor_reference() |
| # No more work items can be added if: |
| # - The interpreter is shutting down OR |
| @@ -226,17 +261,11 @@ def _queue_management_worker(executor_re |
| # Since no new work items can be added, it is safe to shutdown |
| # this thread if there are no pending work items. |
| if not pending_work_items: |
| - while nb_shutdown_processes < len(processes): |
| - shutdown_one_process() |
| - # If .join() is not called on the created processes then |
| - # some multiprocessing.Queue methods may deadlock on Mac OS |
| - # X. |
| - for p in processes: |
| - p.join() |
| + shutdown_worker() |
| return |
| else: |
| # Start shutting down by telling a process it can exit. |
| - shutdown_one_process() |
| + call_queue.put(None) |
| del executor |
| _system_limits_checked = False |
| @@ -264,6 +293,14 @@ def _check_system_limits(): |
| _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max |
| raise NotImplementedError(_system_limited) |
| + |
| +class BrokenProcessPool(RuntimeError): |
| + """ |
|
bquinlan
2011/06/08 12:31:32
Could you start the doc comment with a single line
|
| + Raised when a process in a ProcessPoolExecutor terminated abruptly |
| + while a future was in the running state. |
| + """ |
| + |
| + |
| class ProcessPoolExecutor(_base.Executor): |
| def __init__(self, max_workers=None): |
| """Initializes a new ProcessPoolExecutor instance. |
| @@ -288,11 +325,13 @@ class ProcessPoolExecutor(_base.Executor |
| self._result_queue = SimpleQueue() |
| self._work_ids = queue.Queue() |
| self._queue_management_thread = None |
| - self._processes = set() |
| + # Map of pids to processes |
| + self._processes = {} |
| # Shutdown is a two-step process. |
| self._shutdown_thread = False |
| self._shutdown_lock = threading.Lock() |
| + self._broken = False |
| self._queue_count = 0 |
| self._pending_work_items = {} |
| @@ -302,6 +341,8 @@ class ProcessPoolExecutor(_base.Executor |
| def weakref_cb(_, q=self._result_queue): |
| q.put(None) |
| if self._queue_management_thread is None: |
| + # Start the processes so that their sentinels are known. |
| + self._adjust_process_count() |
| self._queue_management_thread = threading.Thread( |
| target=_queue_management_worker, |
| args=(weakref.ref(self, weakref_cb), |
| @@ -321,10 +362,13 @@ class ProcessPoolExecutor(_base.Executor |
| args=(self._call_queue, |
| self._result_queue)) |
| p.start() |
| - self._processes.add(p) |
| + self._processes[p.pid] = p |
| def submit(self, fn, *args, **kwargs): |
| with self._shutdown_lock: |
| + if self._broken: |
| + raise BrokenProcessPool('A child process terminated ' |
| + 'abruptly, the process pool is not usable anymore') |
| if self._shutdown_thread: |
| raise RuntimeError('cannot schedule new futures after shutdown') |
| @@ -338,7 +382,6 @@ class ProcessPoolExecutor(_base.Executor |
| self._result_queue.put(None) |
| self._start_queue_management_thread() |
| - self._adjust_process_count() |
| return f |
| submit.__doc__ = _base.Executor.submit.__doc__ |