import asyncio from concurrent.futures import ProcessPoolExecutor import traceback class Works1(Exception): def __init__(self, bar=None): super().__init__("test") class Works2(Exception): def __init__(self, *, bar=None): super().__init__() class Breaks(Exception): def __init__(self, *, bar=None): super().__init__("test") def works1(): raise Works1() def works2(): raise Works2() def breaks(): raise Breaks() async def main(): executor = ProcessPoolExecutor() loop = asyncio.get_running_loop() for f in [works1, works2, breaks]: try: await loop.run_in_executor(executor, f) except Exception as ex: print(f"{f} raised {repr(ex)}\n") if f == breaks: traceback.print_exc() asyncio.run(main())