diff -r c0224ff67cdd -r aa7e031d3dc8 Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Mon Oct 13 10:39:41 2014 +0100 +++ b/Lib/concurrent/futures/_base.py Tue Oct 14 11:07:15 2014 +0100 @@ -18,7 +18,7 @@ RUNNING = 'RUNNING' # The future was cancelled by the user... CANCELLED = 'CANCELLED' -# ...and _Waiter.add_cancelled() was called by a worker. +# ...and `set_running_or_notify_cancel` was called by a worker. CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' FINISHED = 'FINISHED' @@ -106,11 +106,10 @@ self.event.set() class _AllCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + """Used by wait(return_when=ALL_COMPLETED).""" - def __init__(self, num_pending_calls, stop_on_exception): + def __init__(self, num_pending_calls): self.num_pending_calls = num_pending_calls - self.stop_on_exception = stop_on_exception self.lock = threading.Lock() super().__init__() @@ -126,49 +125,42 @@ def add_exception(self, future): super().add_exception(future) - if self.stop_on_exception: - self.event.set() - else: - self._decrement_pending_calls() + self._decrement_pending_calls() def add_cancelled(self, future): super().add_cancelled(future) self._decrement_pending_calls() -class _AcquireFutures(object): - """A context manager that does an ordered acquire of Future conditions.""" +class _FirstExceptionWaiter(_AllCompletedWaiter): + """Used by wait(return_when=FIRST_EXCEPTION).""" - def __init__(self, futures): - self.futures = sorted(futures, key=id) + def add_exception(self, future): + super().add_exception(future) + self.event.set() + +class _Subscription(object): + def __init__(self, waiter, futures): + self._waiter = waiter + self._futures = futures + + def _done_callback(self, future): + if future.cancelled(): + self._waiter.add_cancelled(future) + else: + try: + future.result() + except Exception: + self._waiter.add_exception(future) + else: + self._waiter.add_result(future) def __enter__(self): - for future in self.futures: - future._condition.acquire() + for f in self._futures: + f.add_done_callback(self._done_callback) def __exit__(self, *args): - for future in self.futures: - future._condition.release() - -def _create_and_install_waiters(fs, return_when): - if return_when == _AS_COMPLETED: - waiter = _AsCompletedWaiter() - elif return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) - else: - raise ValueError("Invalid return condition: %r" % return_when) - - for f in fs: - f._waiters.append(waiter) - - return waiter + for f in self._futures: + f.remove_done_callback(self._done_callback) def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -192,15 +184,17 @@ end_time = timeout + time.time() fs = set(fs) - with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = fs - finished - waiter = _create_and_install_waiters(fs, _AS_COMPLETED) + pending = set(fs) + waiter = _AsCompletedWaiter() - try: - yield from finished + with _Subscription(waiter, fs): + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + + for future in finished: + yield future + pending.remove(future) while pending: if timeout is None: @@ -223,11 +217,6 @@ yield future pending.remove(future) - finally: - for f in fs: - with f._condition: - f._waiters.remove(waiter) - DoneAndNotDoneFutures = collections.namedtuple( 'DoneAndNotDoneFutures', 'done not_done') def wait(fs, timeout=None, return_when=ALL_COMPLETED): @@ -254,30 +243,22 @@ completed. The second set, named 'not_done', contains uncompleted futures. """ - with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + if return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + elif return_when == FIRST_EXCEPTION: + waiter = _FirstExceptionWaiter(len(fs)) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(len(fs)) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + with _Subscription(waiter, fs): + waiter.event.wait(timeout) + + done = set(waiter.finished_futures) not_done = set(fs) - done - if (return_when == FIRST_COMPLETED) and done: - return DoneAndNotDoneFutures(done, not_done) - elif (return_when == FIRST_EXCEPTION) and done: - if any(f for f in done - if not f.cancelled() and f.exception() is not None): - return DoneAndNotDoneFutures(done, not_done) - - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) - - waiter = _create_and_install_waiters(fs, return_when) - - waiter.event.wait(timeout) - for f in fs: - with f._condition: - f._waiters.remove(waiter) - - done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) + return DoneAndNotDoneFutures(done, not_done) class Future(object): """Represents the result of an asynchronous computation.""" @@ -288,7 +269,6 @@ self._state = PENDING self._result = None self._exception = None - self._waiters = [] self._done_callbacks = [] def _invoke_callbacks(self): @@ -375,6 +355,12 @@ return fn(self) + def remove_done_callback(self, fn): + with self._condition: + self._done_callbacks = [ + callback for callback in self._done_callbacks + if callback is not fn] + def result(self, timeout=None): """Return the result of the call that the future represents. @@ -446,8 +432,7 @@ Should only be used by Executor implementations and unit tests. If the future has been cancelled (cancel() was called and returned - True) then any threads waiting on the future completing (though calls - to as_completed() or wait()) are notified and False is returned. + True) False is returned. If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned. @@ -466,10 +451,6 @@ with self._condition: if self._state == CANCELLED: self._state = CANCELLED_AND_NOTIFIED - for waiter in self._waiters: - waiter.add_cancelled(self) - # self._condition.notify_all() is not necessary because - # self.cancel() triggers a notification. return False elif self._state == PENDING: self._state = RUNNING @@ -488,8 +469,6 @@ with self._condition: self._result = result self._state = FINISHED - for waiter in self._waiters: - waiter.add_result(self) self._condition.notify_all() self._invoke_callbacks() @@ -501,8 +480,6 @@ with self._condition: self._exception = exception self._state = FINISHED - for waiter in self._waiters: - waiter.add_exception(self) self._condition.notify_all() self._invoke_callbacks() diff -r c0224ff67cdd -r aa7e031d3dc8 Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Mon Oct 13 10:39:41 2014 +0100 +++ b/Lib/test/test_concurrent_futures.py Tue Oct 14 11:07:15 2014 +0100 @@ -198,11 +198,11 @@ future2 = self.executor.submit(time.sleep, 1.5) done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], + [future1, future2], return_when=futures.FIRST_COMPLETED) self.assertEqual(set([future1]), done) - self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) + self.assertEqual(set([future2]), not_done) def test_first_completed_some_already_completed(self): future1 = self.executor.submit(time.sleep, 1.5) @@ -240,9 +240,10 @@ return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, future1]), finished) - self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) + self.assertEqual(set([future2]), pending) def test_first_exception_one_already_failed(self): future1 = self.executor.submit(time.sleep, 2) @@ -513,6 +514,7 @@ f = Future() f.add_done_callback(fn) self.assertTrue(f.cancel()) + f.set_running_or_notify_cancel() self.assertTrue(was_cancelled) def test_done_callback_raises(self):