class _GatheringFuture(futures.Future): """Helper for gather(). This overrides cancel() to cancel all the children and act more like Task.cancel(), which doesn't immediately mark itself as cancelled. """ def __init__(self, children, *, loop=None): super().__init__(loop=loop) self._children = children self._cancel_requested = False def cancel(self, msg=None): if self.done(): return False ret = False for child in self._children: if child.cancel(msg=msg): ret = True if ret: # If any child tasks were actually cancelled, we should # propagate the cancellation request regardless of # *return_exceptions* argument. See issue 32684. self._cancel_requested = True self._cancel_message = msg return ret def gather(*coros_or_futures, return_exceptions=False): """Return a future aggregating results from the given coroutines/futures. Coroutines will be wrapped in a future and scheduled in the event loop. They will not necessarily be scheduled in the same order as passed in. All futures must share the same event loop. If all the tasks are done successfully, the returned future's result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If *return_exceptions* is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future. Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. If any child is cancelled, this is treated as if it raised CancelledError -- the outer Future is *not* cancelled in this case. (This is to prevent the cancellation of one child to cause other children to be cancelled.) If *return_exceptions* is False, cancelling gather() after it has been marked done won't cancel any submitted awaitables. For instance, gather can be marked done after propagating an exception to the caller, therefore, calling ``gather.cancel()`` after catching an exception (raised by one of the awaitables) from gather won't cancel any other awaitables. """ if not coros_or_futures: loop = events.get_event_loop() outer = loop.create_future() outer.set_result([]) return outer def _done_callback(fut): nonlocal nfinished nfinished += 1 if outer.done(): if not fut.cancelled(): # Mark exception retrieved. fut.exception() return if not return_exceptions: if fut.cancelled(): # Check if 'fut' is cancelled first, as # 'fut.exception()' will *raise* a CancelledError # instead of returning it. if outer._cancel_requested: super(_GatheringFuture, outer).cancel(outer._cancel_message) else: exc = fut._make_cancelled_error() outer.set_exception(exc) return else: exc = fut.exception() if exc is not None: outer.set_exception(exc) return if nfinished == nfuts: # All futures are done; create a list of results # and set it to the 'outer' future. results = [] for fut in children: if fut.cancelled(): # Check if 'fut' is cancelled first, as 'fut.exception()' # will *raise* a CancelledError instead of returning it. # Also, since we're adding the exception return value # to 'results' instead of raising it, don't bother # setting __context__. This also lets us preserve # calling '_make_cancelled_error()' at most once. res = exceptions.CancelledError( '' if fut._cancel_message is None else fut._cancel_message) else: res = fut.exception() if res is None: res = fut.result() results.append(res) if outer._cancel_requested: # If gather is being cancelled we must propagate the # cancellation regardless of *return_exceptions* argument. # See issue 32684. super(_GatheringFuture, outer).cancel(outer._cancel_message) else: outer.set_result(results) arg_to_fut = {} children = [] nfuts = 0 nfinished = 0 loop = None for arg in coros_or_futures: if arg not in arg_to_fut: fut = ensure_future(arg, loop=loop) if loop is None: loop = futures._get_loop(fut) if fut is not arg: # 'arg' was not a Future, therefore, 'fut' is a new # Future created specifically for 'arg'. Since the caller # can't control it, disable the "destroy pending task" # warning. fut._log_destroy_pending = False nfuts += 1 arg_to_fut[arg] = fut fut.add_done_callback(_done_callback) else: # There's a duplicate Future object in coros_or_futures. fut = arg_to_fut[arg] children.append(fut) outer = _GatheringFuture(children, loop=loop) return outer