diff -r 854d05c13a8e Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Tue Feb 04 18:18:27 2014 +0100 +++ b/Lib/concurrent/futures/_base.py Tue Feb 04 21:43:31 2014 -0500 @@ -90,86 +90,77 @@ super(_AsCompletedWaiter, self).add_cancelled(future) self.event.set() -class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED).""" +class _TrackCompletedFutures(object): + """ An internal class to track the progress of futures. """ + def __init__(self, futures, timeout=None): + self.fs = set(futures) + self.timeout = timeout + self.wait = True + if timeout is not None: + self.end_time = timeout + time.time() - def add_result(self, future): - super().add_result(future) - self.event.set() + def _wait_on_waiter(self, pending_count): + if self.timeout is None: + wait_timeout = None + else: + wait_timeout = self.end_time - time.time() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + pending_count, len(self.fs))) + self.waiter.event.wait(wait_timeout) - def add_exception(self, future): - super().add_exception(future) - self.event.set() + def _get_finished_futures(self): + waiter = self.waiter + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() + return finished - def add_cancelled(self, future): - super().add_cancelled(future) - self.event.set() + def as_completed(self): + self.waiter = waiter = _AsCompletedWaiter() + pending = set() + for f in self.fs: + with f._condition: # Lock the Future; yield if completed or add our Waiter + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]: + yield f + else: + pending.add(f) + f._waiters.append(waiter) + # Each future in the pending set has our Waiter attached + try: + count = len(pending) + while count > 0: + if self.wait: + self._wait_on_waiter(count) # may raise TimeoutError + else: + yield from self._get_finished_futures() + raise StopIteration() + finished = self._get_finished_futures() + yield from finished + count -= len(finished) + finally: + self._cleanup(pending) -class _AllCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + def _cleanup(self, installed_waiters): + waiter = self.waiter + for f in installed_waiters: + with f._condition: + f._waiters.remove(waiter) - def __init__(self, num_pending_calls, stop_on_exception): - self.num_pending_calls = num_pending_calls - self.stop_on_exception = stop_on_exception - self.lock = threading.Lock() - super().__init__() - - def _decrement_pending_calls(self): - with self.lock: - self.num_pending_calls -= 1 - if not self.num_pending_calls: - self.event.set() - - def add_result(self, future): - super().add_result(future) - self._decrement_pending_calls() - - def add_exception(self, future): - super().add_exception(future) - if self.stop_on_exception: - self.event.set() - else: - self._decrement_pending_calls() - - def add_cancelled(self, future): - super().add_cancelled(future) - self._decrement_pending_calls() - -class _AcquireFutures(object): - """A context manager that does an ordered acquire of Future conditions.""" - - def __init__(self, futures): - self.futures = sorted(futures, key=id) + # Controls whether as_completed() will block. + def set_wait(self, wait_on_waiter=True): + self.wait = wait_on_waiter def __enter__(self): - for future in self.futures: - future._condition.acquire() + return self - def __exit__(self, *args): - for future in self.futures: - future._condition.release() + # When used as a context manager, consume any TimeoutError + def __exit__(self, type, value, traceback): + return isinstance(value, TimeoutError) -def _create_and_install_waiters(fs, return_when): - if return_when == _AS_COMPLETED: - waiter = _AsCompletedWaiter() - elif return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) - else: - raise ValueError("Invalid return condition: %r" % return_when) - - for f in fs: - f._waiters.append(waiter) - - return waiter - +# Visible methods def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -181,55 +172,14 @@ Returns: An iterator that yields the given Futures as they complete (finished or - cancelled). If any given Futures are duplicated, they will be returned - once. + cancelled). Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. """ - if timeout is not None: - end_time = timeout + time.time() + yield from _TrackCompletedFutures(fs, timeout).as_completed() - fs = set(fs) - with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = fs - finished - waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - - try: - yield from finished - - while pending: - if timeout is None: - wait_timeout = None - else: - wait_timeout = end_time - time.time() - if wait_timeout < 0: - raise TimeoutError( - '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) - - waiter.event.wait(wait_timeout) - - with waiter.lock: - finished = waiter.finished_futures - waiter.finished_futures = [] - waiter.event.clear() - - for future in finished: - yield future - pending.remove(future) - - finally: - for f in fs: - with f._condition: - f._waiters.remove(waiter) - -DoneAndNotDoneFutures = collections.namedtuple( - 'DoneAndNotDoneFutures', 'done not_done') def wait(fs, timeout=None, return_when=ALL_COMPLETED): """Wait for the futures in the given sequence to complete. @@ -254,30 +204,40 @@ completed. The second set, named 'not_done', contains uncompleted futures. """ - with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_done = set(fs) - done + if return_when == FIRST_COMPLETED: + return _wait_first_completed(fs, timeout) + elif return_when == FIRST_EXCEPTION: + return _wait_first_exception(fs, timeout) + else: + return _wait_all_completed(fs, timeout) - if (return_when == FIRST_COMPLETED) and done: - return DoneAndNotDoneFutures(done, not_done) - elif (return_when == FIRST_EXCEPTION) and done: - if any(f for f in done - if not f.cancelled() and f.exception() is not None): - return DoneAndNotDoneFutures(done, not_done) +# Internal support methods for variants of wait() +def _wait_first_completed(fs, timeout=None): + done = set() + with _TrackCompletedFutures(fs, timeout) as tracker: + for f in tracker.as_completed(): + done.add(f) + tracker.set_wait(wait_on_waiter=False) # do not wait for subsequent Futures to finish + return DoneAndNotDoneFutures( done, set(fs) - done) - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) +def _wait_first_exception(fs, timeout=None): + done = set() + with _TrackCompletedFutures(fs, timeout) as tracker: + for f in tracker.as_completed(): + done.add(f) + with f._condition: + if f._exception is not None and (f._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]): + tracker.set_wait(wait_on_waiter=False) + return DoneAndNotDoneFutures( done, set(fs) - done) - waiter = _create_and_install_waiters(fs, return_when) +def _wait_all_completed(fs, timeout=None): + done = set() + with _TrackCompletedFutures(fs, timeout) as tracker: + done.update((f for f in tracker.as_completed())) + return DoneAndNotDoneFutures( done, set(fs) - done) - waiter.event.wait(timeout) - for f in fs: - with f._condition: - f._waiters.remove(waiter) - - done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') class Future(object): """Represents the result of an asynchronous computation."""