Index: Lib/concurrent/futures/_base.py =================================================================== --- Lib/concurrent/futures/_base.py (revision 86476) +++ Lib/concurrent/futures/_base.py (working copy) @@ -12,6 +12,7 @@ FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' @@ -70,8 +71,30 @@ def add_cancelled(self, future): self.finished_futures.append(future) +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" + """Used by wait(return_when=FIRST_COMPLETED).""" def add_result(self, future): super().add_result(future) @@ -128,7 +151,9 @@ future._condition.release() def _create_and_install_waiters(fs, return_when): - if return_when == FIRST_COMPLETED: + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: waiter = _FirstCompletedWaiter() else: pending_count = sum( @@ -187,11 +212,15 @@ '%d (of %d) futures unfinished' % ( len(pending), len(fs))) - waiter.event.wait(timeout) + waiter.event.wait(wait_timeout) - for future in waiter.finished_futures[:]: + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() + + for future in finished: yield future - waiter.finished_futures.remove(future) pending.remove(future) finally: