diff -r 8667c26e2bec Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Mon Jun 15 09:11:37 2015 -0700 +++ b/Lib/concurrent/futures/_base.py Mon Jun 15 11:41:51 2015 -0700 @@ -282,7 +282,7 @@ class Future(object): """Represents the result of an asynchronous computation.""" - def __init__(self): + def __init__(self, state_change_callbacks=None): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING @@ -290,6 +290,9 @@ self._exception = None self._waiters = [] self._done_callbacks = [] + self._state_change_callbacks = [] + if state_change_callbacks: + self._state_change_callbacks.extend(state_change_callbacks) def _invoke_callbacks(self): for callback in self._done_callbacks: @@ -298,6 +301,16 @@ except Exception: LOGGER.exception('exception calling callback for %r', self) + def _change_state_invoke_callbacks(self, new_state): + old_state = self._state + self._state = new_state + for callback in self._state_change_callbacks: + try: + callback(self, old_state, new_state) + except Exception: + LOGGER.exception('exception calling state change callback' + ' for %r', self) + def __repr__(self): with self._condition: if self._state == FINISHED: @@ -331,7 +344,7 @@ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: return True - self._state = CANCELLED + self._change_state_invoke_callbacks(CANCELLED) self._condition.notify_all() self._invoke_callbacks() @@ -465,14 +478,14 @@ """ with self._condition: if self._state == CANCELLED: - self._state = CANCELLED_AND_NOTIFIED + self._change_state_invoke_callbacks(CANCELLED_AND_NOTIFIED) for waiter in self._waiters: waiter.add_cancelled(self) # self._condition.notify_all() is not necessary because # self.cancel() triggers a notification. return False elif self._state == PENDING: - self._state = RUNNING + self._change_state_invoke_callbacks(RUNNING) return True else: LOGGER.critical('Future %s in unexpected state: %s', @@ -487,7 +500,7 @@ """ with self._condition: self._result = result - self._state = FINISHED + self._change_state_invoke_callbacks(FINISHED) for waiter in self._waiters: waiter.add_result(self) self._condition.notify_all() @@ -500,7 +513,7 @@ """ with self._condition: self._exception = exception - self._state = FINISHED + self._change_state_invoke_callbacks(FINISHED) for waiter in self._waiters: waiter.add_exception(self) self._condition.notify_all() @@ -509,6 +522,9 @@ class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" + def __init__(self, state_change_callbacks=None): + self._state_change_callbacks = state_change_callbacks + def submit(self, fn, *args, **kwargs): """Submits a callable to be executed with the given arguments. diff -r 8667c26e2bec Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py Mon Jun 15 09:11:37 2015 -0700 +++ b/Lib/concurrent/futures/process.py Mon Jun 15 11:41:51 2015 -0700 @@ -365,7 +365,7 @@ class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, state_change_callbacks=None): """Initializes a new ProcessPoolExecutor instance. Args: @@ -373,6 +373,9 @@ execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """ + super(ProcessPoolExecutor, self).__init__( + state_change_callbacks=state_change_callbacks) + _check_system_limits() if max_workers is None: @@ -442,7 +445,8 @@ if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') - f = _base.Future() + f = _base.Future( + state_change_callbacks=self._state_change_callbacks) w = _WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w diff -r 8667c26e2bec Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py Mon Jun 15 09:11:37 2015 -0700 +++ b/Lib/concurrent/futures/thread.py Mon Jun 15 11:41:51 2015 -0700 @@ -81,13 +81,15 @@ _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, state_change_callbacks=None): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. """ + super(ThreadPoolExecutor, self).__init__( + state_change_callbacks=state_change_callbacks) if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. @@ -106,7 +108,8 @@ if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') - f = _base.Future() + f = _base.Future( + state_change_callbacks=self._state_change_callbacks) w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w)