import sys class MyException(Exception): pass _PENDING = 'PENDING' _FINISHED = 'FINISHED' class _StopError(BaseException): pass def _raise_stop_error(*args): raise _StopError class EventLoop: def __init__(self): self._closed = False self._ready = [] def run_forever(self): while True: try: self._run_once() except _StopError: break def run_until_complete(self, future): future = MyTask(future, loop=self) future.add_done_callback(_raise_stop_error) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def stop(self): self.call_soon(_raise_stop_error) def close(self): if self._closed: return self._closed = True self._ready.clear() def call_soon(self, callback, *args): handle = self._call_soon(callback, args) return handle def _call_soon(self, callback, args): handle = (callback, args) self._ready.append(handle) return handle def call_exception_handler(self, context): print(context) def _run_once(self): callbacks = list(self._ready) self._ready.clear() for callback,args in callbacks: callback(*args) class TaskFuture: def __init__(self, loop): self._state = _PENDING self._result = None self._exception = None self._callbacks = [] def _schedule_callbacks(self): callbacks = self._callbacks[:] if not callbacks: return self._callbacks[:] = [] for callback in callbacks: loop.call_soon(callback, self) def done(self): return self._state != _PENDING def result(self): if self._exception is not None: raise self._exception return self._result def exception(self): return self._exception def add_done_callback(self, fn): if self._state != _PENDING: loop.call_soon(fn, self) else: self._callbacks.append(fn) def set_result(self, result): if self._state != _PENDING: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED self._schedule_callbacks() def set_exception(self, exception): if self._state != _PENDING: raise InvalidStateError('{}: {!r}'.format(self._state, self)) if isinstance(exception, type): exception = exception() self._exception = exception self._state = _FINISHED self._schedule_callbacks() def __iter__(self): if not self.done(): yield self return self.result() class MyTask(TaskFuture): def __init__(self, coro, *, loop=None): super().__init__(loop=loop) self._coro = iter(coro) self._fut_waiter = None loop.call_soon(self._step) def _step(self, value=None, exc=None): coro = self._coro self._fut_waiter = None try: if exc is not None: result = coro.throw(exc) elif value is not None: result = coro.send(value) else: result = next(coro) except StopIteration as exc: self.set_result(exc.value) except Exception as exc: self.set_exception(exc) except BaseException as exc: self.set_exception(exc) raise else: if isinstance(result, (TaskFuture, MyFutureException, MyFutureResult)): result.add_done_callback(self._wakeup) self._fut_waiter = result elif result is None: loop.call_soon(self._step) else: loop.call_soon( self._step, None, RuntimeError( 'Task got bad yield: {!r}'.format(result))) finally: self = None def _wakeup(self, future): try: value = future.result() except Exception as exc: self._step(None, exc) else: self._step(value, None) self = None class MyFutureResult: def __init__(self): self._callbacks = [] self._result = None def _schedule_callbacks(self): callbacks = self._callbacks[:] if not callbacks: return self._callbacks[:] = [] for callback in callbacks: loop.call_soon(callback, self) def done(self): return self._result is not None def result(self): return self._result def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self._result = result self._schedule_callbacks() def __iter__(self): yield self return self.result() class MyFutureException: def __init__(self): self._exception = None self._callbacks = [] def _schedule_callbacks(self): """Internal: Ask the event loop to call all callbacks. The callbacks are scheduled to be called as soon as possible. Also clears the callback list. """ callbacks = self._callbacks[:] if not callbacks: return self._callbacks[:] = [] for callback in callbacks: loop.call_soon(callback, self) def done(self): return self._exception is not None def result(self): raise self._exception def add_done_callback(self, fn): self._callbacks.append(fn) def set_exception(self, exception): """Mark the future done and set an exception. If the future is already done when this method is called, raises InvalidStateError. """ if isinstance(exception, type): exception = exception() self._exception = exception self._schedule_callbacks() def __iter__(self): yield self return self.result() def wait_for_fut(fut): return (yield from fut) def _make_subprocess_transport(): waiter = MyFutureException() loop.call_soon(waiter.set_exception, MyException) try: yield from waiter except: fut = MyFutureResult() loop.call_soon(fut.set_result, 9) yield from wait_for_fut(fut) raise return 3 def subprocess_exec(): coro = _make_subprocess_transport() yield from coro return 3 def create_subprocess_exec(): coro = subprocess_exec() yield from coro return 4 def cancel_make_transport(): coro = create_subprocess_exec() task = MyTask(coro, loop=loop) try: yield from task except MyException: pass print("exc_info after except", sys.exc_info()) task = None loop = EventLoop() loop.run_until_complete(cancel_make_transport()) loop.close() print("exc_info at exit", sys.exc_info())