diff -r bda68f973228 Doc/library/concurrent.futures.rst --- a/Doc/library/concurrent.futures.rst Sun Aug 10 10:33:28 2014 -0700 +++ b/Doc/library/concurrent.futures.rst Mon Aug 11 01:42:44 2014 -0400 @@ -115,11 +115,21 @@ executor.submit(wait_on_future) -.. class:: ThreadPoolExecutor(max_workers) +.. class:: ThreadPoolExecutor(max_workers, initializer=None, initargs=()) An :class:`Executor` subclass that uses a pool of at most *max_workers* threads to execute calls asynchronously. + *initializer*: an optional callable used to initialize new worker threads. + *initargs*: a tuple of arguments passed to the initializer. + + Should *initializer* raise an exception, all currently pending jobs will + raise a :exc:`RuntimeError`, as well any attempt to submit more jobs to + the pool. + + .. versionchanged:: 3.5 + Added *initializer* and *initargs* keyword arguments. + .. _threadpoolexecutor-example: @@ -170,7 +180,8 @@ Calling :class:`Executor` or :class:`Future` methods from a callable submitted to a :class:`ProcessPoolExecutor` will result in deadlock. -.. class:: ProcessPoolExecutor(max_workers=None) +.. class:: ProcessPoolExecutor(max_workers=None, initializer=None, initargs=()) + An :class:`Executor` subclass that executes calls asynchronously using a pool of at most *max_workers* processes. If *max_workers* is ``None`` or not @@ -178,12 +189,22 @@ If *max_workers* is lower or equal to ``0``, then a :exc:`ValueError` will be raised. + *initializer*: an optional callable used to initialize new worker procceses. + *initargs*: a tuple of arguments passed to the initializer. + + Should *initializer* raise an exception, all currently pending jobs will + raise a :exc:`RuntimeError`, as well any attempt to submit more jobs to + the pool. + .. versionchanged:: 3.3 When one of the worker processes terminates abruptly, a :exc:`BrokenProcessPool` error is now raised. Previously, behaviour was undefined but operations on the executor or its futures would often freeze or deadlock. + .. versionchanged:: 3.5 + Added *initializer* and *initargs* keyword arguments. + .. _processpoolexecutor-example: diff -r bda68f973228 Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py Sun Aug 10 10:33:28 2014 -0700 +++ b/Lib/concurrent/futures/process.py Mon Aug 11 01:42:44 2014 -0400 @@ -108,7 +108,11 @@ self.args = args self.kwargs = kwargs -def _process_worker(call_queue, result_queue): +class _InitializerFailed(object): + def __init__(self, pid): + self.pid = pid + +def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -118,9 +122,17 @@ evaluated by the worker. result_queue: A multiprocessing.Queue of _ResultItems that will written to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. + initializer: a callable used to initialize new worker threads. + initargs: a tuple of arguments passed to the initializer. """ + if initializer is not None: + try: + initializer(*initargs) + except Exception: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + result_queue.put(_InitializerFailed(os.getpid())) + return + while True: call_item = call_queue.get(block=True) if call_item is None: @@ -214,6 +226,14 @@ for p in processes.values(): p.join() + def handle_process_shutdown(result_item): + p = processes.pop(result_item.pid) + p.join() + if not processes: + shutdown_worker() + return True + return False + reader = result_queue._reader while True: @@ -254,10 +274,28 @@ # Clean shutdown of a worker using its PID # (avoids marking the executor broken) assert shutting_down() - p = processes.pop(result_item) - p.join() - if not processes: - shutdown_worker() + if handle_process_shutdown(result_item): + return + elif isinstance(result_item, _InitializerFailed): + # Set the initializer_failed state on the pool so + # that submits fail. + executor = executor_reference() + if executor is not None: + executor._shutdown_thread = True + executor._initializer_failed = True + executor = None + # All futures in flight must be marked failed + for work_id, work_item in pending_work_items.items(): + work_item.future.set_exception( + RuntimeError( + "The future was aborted because the pool's " + "initializer function failed." + )) + # Delete references to object. See issue16284 + del work_item + pending_work_items.clear() + # shutdown the worker + if handle_process_shutdown(result_item): return elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) @@ -321,13 +359,15 @@ class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, initializer=None, initargs=()): """Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. + initializer: a callable used to initialize new worker processes. + initargs: a tuple of arguments passed to the initializer. """ _check_system_limits() @@ -338,6 +378,11 @@ raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers + self._initializer = initializer + self._initargs = initargs + + if initializer is not None and not callable(initializer): + raise TypeError('initializer must be a callable') # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big @@ -358,6 +403,7 @@ self._shutdown_thread = False self._shutdown_lock = threading.Lock() self._broken = False + self._initializer_failed = False self._queue_count = 0 self._pending_work_items = {} @@ -386,7 +432,9 @@ p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, - self._result_queue)) + self._result_queue, + self._initializer, + self._initargs)) p.start() self._processes[p.pid] = p @@ -395,6 +443,11 @@ if self._broken: raise BrokenProcessPool('A child process terminated ' 'abruptly, the process pool is not usable anymore') + if self._initializer_failed: + raise RuntimeError('Cannot schedule new futures after ' + 'initializer fails. The pool is not ' + 'usable anymore.') + if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') diff -r bda68f973228 Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py Sun Aug 10 10:33:28 2014 -0700 +++ b/Lib/concurrent/futures/thread.py Mon Aug 11 01:42:44 2014 -0400 @@ -57,12 +57,29 @@ else: self.future.set_result(result) -def _worker(executor_reference, work_queue): +def _worker(executor_reference, work_queue, initializer, initargs): + _initializer_failed = False + if initializer is not None: + try: + initializer(*initargs) + except Exception: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + _initializer_failed = True + executor = executor_reference() + executor._initializer_failed = True + del executor try: while True: work_item = work_queue.get(block=True) if work_item is not None: - work_item.run() + if _initializer_failed: + work_item.future.set_exception( + RuntimeError( + "The future was aborted because the pool's " + "initializer function failed." + )) + else: + work_item.run() # Delete references to object. See issue16284 del work_item continue @@ -80,12 +97,14 @@ _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): + def __init__(self, max_workers, initializer=None, initargs=()): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. + initializer: a callable used to initialize new worker threads. + initargs: a tuple of arguments passed to the initializer. """ if max_workers <= 0: raise ValueError("max_workers must be greater than 0") @@ -95,11 +114,22 @@ self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() + self._initializer_failed = False + self._initializer = initializer + self._initargs = initargs + + if initializer is not None and not callable(initializer): + raise TypeError('initializer must be a callable') def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') + raise RuntimeError('Cannot schedule new futures after shutdown.') + + if self._initializer_failed: + raise RuntimeError('Cannot schedule new futures after ' + 'initializer fails. The pool is not ' + 'usable anymore.') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) @@ -119,7 +149,9 @@ if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self, weakref_cb), - self._work_queue)) + self._work_queue, + self._initializer, + self._initargs)) t.daemon = True t.start() self._threads.add(t) diff -r bda68f973228 Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Sun Aug 10 10:33:28 2014 -0700 +++ b/Lib/test/test_concurrent_futures.py Mon Aug 11 01:42:44 2014 -0400 @@ -16,6 +16,7 @@ import time import unittest import weakref +from unittest.mock import patch from concurrent import futures from concurrent.futures._base import ( @@ -38,6 +39,7 @@ EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) +INITIALIZER_STATUS = 'uninitialized' def mul(x, y): return x * y @@ -52,6 +54,15 @@ print(msg) sys.stdout.flush() +def init(x): + global INITIALIZER_STATUS + INITIALIZER_STATUS = x + +def get_status(): + return INITIALIZER_STATUS + +def fail(): + raise ValueError('error in initializer') class MyObject(object): def my_method(self): @@ -81,7 +92,6 @@ # tests. This should reduce the probability of timeouts in the tests. futures = [self.executor.submit(time.sleep, 0.1) for _ in range(self.worker_count)] - for f in futures: f.result() @@ -94,6 +104,59 @@ executor_type = futures.ProcessPoolExecutor +class ExecutorInitializerMixin(ExecutorMixin): + def setUp(self): + self.t1 = time.time() + try: + self.executor = self.executor_type(max_workers=self.worker_count, + initializer=init, initargs=('initialized', )) + except NotImplementedError as e: + self.skipTest(str(e)) + self._prime_executor() + + def test_initializer(self): + futures = [self.executor.submit(get_status) + for _ in range(self.worker_count)] + + for f in futures: + self.assertEqual(f.result(), 'initialized') + + +class FailingInitializerMixin(ExecutorMixin): + def setUp(self): + self.t1 = time.time() + try: + self.executor = self.executor_type(max_workers=self.worker_count, + initializer=fail, initargs=()) + except NotImplementedError as e: + self.skipTest(str(e)) + + def test_initializer(self): + with patch('concurrent.futures._base.LOGGER'): + self.assertRaises((RuntimeError, BrokenProcessPool), + self._prime_executor) + + +class ThreadPoolInitializerTest(ExecutorInitializerMixin, + ThreadPoolMixin, unittest.TestCase): + pass + + +class ProcessPoolInitializerTest(ExecutorInitializerMixin, + ProcessPoolMixin, unittest.TestCase): + pass + + +class ThreadPoolFailingInitializerTest(FailingInitializerMixin, + ThreadPoolMixin, unittest.TestCase): + pass + + +class ProcessPoolFailingInitializerTest(FailingInitializerMixin, + ProcessPoolMixin, unittest.TestCase): + pass + + class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown()