def ensure_future_threadsafe(coro_or_future, *, loop=None): """Wrap a coroutine, an awaitable or a future in a concurrent.futures.Future. """ if isinstance(coro_or_future, futures.Future): if loop is not None and loop is not coro_or_future._loop: raise ValueError('loop argument must agree with Future') elif coroutines.iscoroutine(coro_or_future) or \ compat.PY35 and inspect.isawaitable(coro_or_future): if loop is None: loop = events.get_event_loop() else: raise TypeError('A Future, a coroutine or an awaitable is required') concurrent_future = concurrent.futures.Future() def callback(): try: future = ensure_future(coro_or_future, loop=loop) futures._chain_future(future, concurrent_future) except Exception as exc: if concurrent_future.set_running_or_notify_cancel(): concurrent_future.set_exception(exc) loop.call_soon_threadsafe(callback) return concurrent_future