import asyncio class TestError(Exception): pass class Engine(object): def __init__(self): self.event = asyncio.Event() async def job_error(self): await asyncio.sleep(1.0) try: raise TestError() finally: self.event.set() async def job_event(self): """ This will fail the normal exit procedure. """ while True: # do work try: await asyncio.wait_for(asyncio.shield(self.event.wait()), timeout=60.0) self.event.clear() except asyncio.TimeoutError: pass async def job_event2(self): """ This is an alternative implementation of .job_event() that works. """ f = asyncio.ensure_future(self.event.wait()) try: while True: # do work if f.done(): f = asyncio.ensure_future(self.event.wait()) d, _ = await asyncio.wait([f], timeout=60.0) if d: self.event.clear() finally: f.cancel() async def run(self): tasks = [asyncio.ensure_future(self.job_error()), asyncio.ensure_future(self.job_event())] try: done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) # propagate first work task error to the main thread for t in done: if e := t.exception(): raise e finally: for t in tasks: t.cancel() # Detect stuck state for demonstration instead of naively waiting for the tasks. x = 0 while not all(t.done() for t in tasks): if x > 0: raise Exception("A job is stuck by ignored cancellation") await asyncio.sleep(1.0) x += 1 # Gather tasks. await asyncio.gather(*tasks, return_exceptions=True) async def main(): try: await Engine().run() except TestError: print("Job exception handled properly") if __name__ == '__main__': asyncio.run(main())