diff -r a7560c8f38ee Doc/library/concurrent.futures.rst --- a/Doc/library/concurrent.futures.rst Wed May 07 18:08:08 2014 +0100 +++ b/Doc/library/concurrent.futures.rst Thu May 08 01:09:14 2014 +0200 @@ -115,11 +115,14 @@ executor.submit(wait_on_future) -.. class:: ThreadPoolExecutor(max_workers) +.. class:: ThreadPoolExecutor(max_workers, initializer=None, initargs=()) An :class:`Executor` subclass that uses a pool of at most *max_workers* threads to execute calls asynchronously. + *initializer*: an optional callable used to initialize new worker threads. + *initargs*: a tuple of arguments passed to the initializer. + .. _threadpoolexecutor-example: @@ -170,12 +173,15 @@ Calling :class:`Executor` or :class:`Future` methods from a callable submitted to a :class:`ProcessPoolExecutor` will result in deadlock. -.. class:: ProcessPoolExecutor(max_workers=None) +.. class:: ProcessPoolExecutor(max_workers=None, initializer=None, initargs=()) An :class:`Executor` subclass that executes calls asynchronously using a pool of at most *max_workers* processes. If *max_workers* is ``None`` or not given, it will default to the number of processors on the machine. + *initializer*: an optional callable used to initialize new worker processes. + *initargs*: a tuple of arguments passed to the initializer. + .. versionchanged:: 3.3 When one of the worker processes terminates abruptly, a :exc:`BrokenProcessPool` error is now raised. Previously, behaviour diff -r a7560c8f38ee Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py Wed May 07 18:08:08 2014 +0100 +++ b/Lib/concurrent/futures/process.py Thu May 08 01:09:14 2014 +0200 @@ -108,7 +108,7 @@ self.args = args self.kwargs = kwargs -def _process_worker(call_queue, result_queue): +def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -121,6 +121,11 @@ shutdown: A multiprocessing.Event that will be set as a signal to the worker that it should exit when call_queue is empty. """ + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + return # FIXME? while True: call_item = call_queue.get(block=True) if call_item is None: @@ -321,13 +326,15 @@ class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, initializer=None, initargs=()): """Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. + initializer: a callable used to initialize new worker processes. + initargs: a tuple of arguments passed to the initializer. """ _check_system_limits() @@ -335,6 +342,8 @@ self._max_workers = os.cpu_count() or 1 else: self._max_workers = max_workers + self._initializer = initializer + self._initargs = initargs # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big @@ -383,7 +392,9 @@ p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, - self._result_queue)) + self._result_queue, + self._initializer, + self._initargs)) p.start() self._processes[p.pid] = p diff -r a7560c8f38ee Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py Wed May 07 18:08:08 2014 +0100 +++ b/Lib/concurrent/futures/thread.py Thu May 08 01:09:14 2014 +0200 @@ -57,7 +57,12 @@ else: self.future.set_result(result) -def _worker(executor_reference, work_queue): +def _worker(executor_reference, work_queue, initializer, initargs): + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer', exc_info=True) try: while True: work_item = work_queue.get(block=True) @@ -80,14 +85,18 @@ _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): + def __init__(self, max_workers, initializer=None, initargs=()): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. + initializer: a callable used to initialize new worker threads. + initargs: a tuple of arguments passed to the initializer. """ self._max_workers = max_workers + self._initializer = initializer + self._initargs = initargs self._work_queue = queue.Queue() self._threads = set() self._shutdown = False @@ -116,7 +125,9 @@ if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self, weakref_cb), - self._work_queue)) + self._work_queue, + self._initializer, + self._initargs)) t.daemon = True t.start() self._threads.add(t)