def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. Args: fs: The sequence of Futures (possibly created by different Executors) to iterate over. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: An iterator that yields the given Futures as they complete (finished or cancelled). Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. """ if timeout is not None: end_time = timeout + time.time() waiter = _AsCompletedWaiter() pending = set() for f in fs: with f._condition: if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]: yield f else: pending.add(f) f._waiters.append(waiter) installed_waiters = frozenset(pending) try: while pending: if timeout is None: wait_timeout = None else: wait_timeout = end_time - time.time() if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( len(pending), len(fs))) waiter.event.wait(wait_timeout) with waiter.lock: finished = waiter.finished_futures waiter.finished_futures = [] waiter.event.clear() for future in finished: yield future pending.remove(future) finally: for f in installed_waiters: with f._condition: f._waiters.remove(waiter)