diff -r b116489d31ff Doc/library/multiprocessing.rst --- a/Doc/library/multiprocessing.rst Thu Aug 21 19:16:17 2014 -0400 +++ b/Doc/library/multiprocessing.rst Sun Aug 24 15:32:15 2014 -0400 @@ -1885,6 +1885,12 @@ .. versionadded:: 3.4 *context* + .. versionchanged:: 3.5 + When one of the worker processes terminates abruptly, a + :exc:`BrokenProcessPool` error is now raised. Previously, behaviour + was undefined but operations on the :class:`Pool` or its workers would + often freeze or deadlock. + .. note:: Worker processes within a :class:`Pool` typically live for the complete @@ -1997,6 +2003,14 @@ Wait for the worker processes to exit. One must call :meth:`close` or :meth:`terminate` before using :meth:`join`. + .. exception:: BrokenProcessPool + + Derived from :exc:`RuntimeError`, this exception class is raised when + one of the workers of a :class:`Pool` has terminated + in a non-clean fashion (for example, if it was killed from the outside). + + .. versionadded:: 3.5 + .. versionadded:: 3.3 Pool objects now support the context manager protocol -- see :ref:`typecontextmanager`. :meth:`~contextmanager.__enter__` returns the diff -r b116489d31ff Lib/multiprocessing/pool.py --- a/Lib/multiprocessing/pool.py Thu Aug 21 19:16:17 2014 -0400 +++ b/Lib/multiprocessing/pool.py Sun Aug 24 15:32:15 2014 -0400 @@ -13,6 +13,12 @@ # Imports # +class BrokenProcessPool(RuntimeError): + """ + Raised when a process in a ProcessPoolExecutor terminated abruptly + while a future was in the running state. + """ + import threading import queue import itertools @@ -25,6 +31,7 @@ # we avoid top-level imports which are liable to fail on some systems. from . import util from . import get_context, TimeoutError +from .connection import wait # # Constants representing the state of a pool @@ -33,6 +40,7 @@ RUN = 0 CLOSE = 1 TERMINATE = 2 +BROKEN = 3 # # Miscellaneous @@ -129,6 +137,8 @@ wrapped)) put((job, i, (False, wrapped))) completed += 1 + # Let result_handler thread know worker is exiting gracefully. + put(os.getpid()) util.debug('worker exiting after %d tasks' % completed) # @@ -167,9 +177,14 @@ self._pool = [] self._repopulate_pool() + # This lock is used to ensure we don't try to send the shut + # down sentinel to worker processes while we're in the process + # of repopulating the pool. + self._repopulate_lock = threading.Lock() + self._worker_handler = threading.Thread( target=Pool._handle_workers, - args=(self, ) + args=(self, self._repopulate_lock) ) self._worker_handler.daemon = True self._worker_handler._state = RUN @@ -185,9 +200,11 @@ self._task_handler._state = RUN self._task_handler.start() + self._result_handler = threading.Thread( - target=Pool._handle_results, - args=(self._outqueue, self._quick_get, self._cache) + target=self._handle_results, + args=(self._outqueue, self._quick_get, self._cache, + self._repopulate_lock, self) ) self._result_handler.daemon = True self._result_handler._state = RUN @@ -201,13 +218,19 @@ exitpriority=15 ) - def _join_exited_workers(self): + def _join_exited_workers(self, wait_pid=None): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. + + If pid is provided, explicitly call join() on an process matching that + pid. + """ cleaned = False for i in reversed(range(len(self._pool))): worker = self._pool[i] + if worker.pid == wait_pid: + worker.join() if worker.exitcode is not None: # worker exited util.debug('cleaning up worker %d' % i) @@ -233,17 +256,22 @@ w.start() util.debug('added worker') - def _maintain_pool(self): - """Clean up any exited workers and start replacements for them. + def _maintain_pool(self, wait_pid=None): + """ Clean up any exited workers and start replacements for them. + + If pid is provided, explicitly join() the Process with that pid, + since it will be exiting very soon. + """ - if self._join_exited_workers(): - self._repopulate_pool() + if self._join_exited_workers(wait_pid=wait_pid): + if self._state == RUN or (self._cache and self._state != TERMINATE): + self._repopulate_pool() def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() self._outqueue = self._ctx.SimpleQueue() self._quick_put = self._inqueue._writer.send - self._quick_get = self._outqueue._reader.recv + self._quick_get = self._outqueue._reader def apply(self, func, args=(), kwds={}): ''' @@ -357,16 +385,16 @@ return result @staticmethod - def _handle_workers(pool): + def _handle_workers(pool, repopulate_lock): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. while thread._state == RUN or (pool._cache and thread._state != TERMINATE): - pool._maintain_pool() time.sleep(0.1) # send sentinel to stop workers - pool._taskqueue.put(None) + with repopulate_lock: + pool._taskqueue.put(None) util.debug('worker handler exiting') @staticmethod @@ -412,10 +440,45 @@ util.debug('task handler exiting') @staticmethod - def _handle_results(outqueue, get, cache): + def _handle_broken_pool(reader, pool, cache): + """ Handle case where a process has unexpectedly exited. + + If a Process has exited without alerting the result_handler + thread first, we need to close/terminate the pool and abort + all running tasks. + + """ + sentinels = [p.sentinel for p in pool._pool] + assert sentinels + ready = wait([reader] + sentinels) + if reader not in ready: + pool.close() + for i, cache_ent in list(cache.items()): + err = BrokenProcessPool('A child process ' + 'terminated abruptly while the worker ' + 'was still executing.') + cache_ent._set(i, (False, err)) + # Mark handler as broken. We need this for terminate + # to succeed. + thread = threading.current_thread() + thread._state = BROKEN + pool.terminate() + return True + return False + + @classmethod + def _handle_results(cls, outqueue, reader, cache, repopulate_lock, pool): thread = threading.current_thread() + # ProcessPool passes _reader, ThreadPool passes a get method. + get = reader.recv if hasattr(reader, 'recv') else reader + while 1: + # Determine if the Pool has been broken. + if cls._handle_broken_pool(reader, pool, cache): + util.debug("pool is broken") + break + try: task = get() except (OSError, EOFError): @@ -431,6 +494,13 @@ util.debug('result handler got sentinel') break + if isinstance(task, int): + # A worker is exiting due to maxtasksperchild. Wait + # for that process to actually exit before continuing. + with repopulate_lock: + pool._maintain_pool(wait_pid=task) + continue + job, i, obj = task try: cache[job]._set(i, obj) @@ -447,6 +517,8 @@ if task is None: util.debug('result handler ignoring extra sentinel') continue + if isinstance(task, int): + continue job, i, obj = task try: cache[job]._set(i, obj) @@ -523,7 +595,10 @@ task_handler._state = TERMINATE util.debug('helping task handler/workers to finish') - cls._help_stuff_finish(inqueue, task_handler, len(pool)) + # Skip _help_finish_stuff if the pool is broken, because + # the broken process may have beeen holding the inqueue lock. + if not result_handler._state == BROKEN: + cls._help_stuff_finish(inqueue, task_handler, len(pool)) assert result_handler.is_alive() or len(cache) == 0 @@ -746,6 +821,10 @@ self._quick_get = self._outqueue.get @staticmethod + def _handle_broken_pool(reader, pool, cache): + return False + + @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # put sentinels at head of inqueue to make workers finish with inqueue.not_empty: diff -r b116489d31ff Lib/test/_test_multiprocessing.py --- a/Lib/test/_test_multiprocessing.py Thu Aug 21 19:16:17 2014 -0400 +++ b/Lib/test/_test_multiprocessing.py Sun Aug 24 15:32:15 2014 -0400 @@ -1827,6 +1827,10 @@ def unpickleable_result(): return lambda: 42 +def waiting(): + while True: + time.sleep(10) + class _TestPoolWorkerErrors(BaseTestCase): ALLOWED_TYPES = ('processes', ) @@ -1845,6 +1849,17 @@ p.close() p.join() + def test_broken_process_pool(self): + from multiprocessing.pool import BrokenProcessPool + p = multiprocessing.Pool(2) + res = p.map_async(waiting, range(3)) + # Kill one of the pool workers. + pid = p._pool[0].pid + os.kill(pid, signal.SIGTERM) + self.assertRaises(BrokenProcessPool, res.get) + p.close() + p.join() + def test_unpickleable_result(self): from multiprocessing.pool import MaybeEncodingError p = multiprocessing.Pool(2)