import asyncio async def job(): print("job(): START...") try: await asyncio.sleep(5.0) except asyncio.CancelledError as e: print("job(): CANCELLED!", e) raise asyncio.CancelledError("TEST") #raise ValueError("TEST") print("job(): DONE.") async def cancel_task_after(task, time): try: await asyncio.sleep(time) except asyncio.CancelledError: print("cancel_task_after(): CANCELLED!") except Exception as e: print("cancel_task_after(): Exception!", e.__class__.__name__, e) task.cancel("Hello!") async def main(): task = asyncio.create_task(job()) # RUN/CANCEL task. try: await asyncio.gather(task, cancel_task_after(task, 1.0)) except asyncio.CancelledError as e: try: task_exc = task.exception() except BaseException as be: task_exc = be print("In running task, we encountered a cancellation! Excpetion message is: ", e) print(" ^ Task exc is:", task_exc.__class__.__name__, task_exc) except Exception as e: print("In running task, we got a generic Exception:", e.__class__.__name__, e) # GET result. try: result = task.result() except asyncio.CancelledError as e: print("Task has been cancelled, exception message is: ", e) except Exception as e: try: task_exc = task.exception() except BaseException as be: task_exc = be print("Task raised generic exception", e.__class__.__name__, e) print(" ^ Task exc is:", task_exc.__class__.__name__, task_exc) else: print("Task result is: ", result) if __name__=="__main__": asyncio.run(main())