import asyncio import concurrent.futures import functools import logging import random import time import threading class EventLoopThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.event = threading.Event() self.loop = None def run(self): policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() policy.set_event_loop(self.loop) self.loop.call_soon(self.event.set) self.loop.run_forever() self.loop.close() def slow_func(): delay = random.randint(0, 10) / 1000 time.sleep(delay) return 'ok' class WorkerThread(threading.Thread): def __init__(self, loop): threading.Thread.__init__(self) self.executor = concurrent.futures.ThreadPoolExecutor(5) self.loop = loop self.event = threading.Event() self.future = None @asyncio.coroutine def task(self): try: self.future = self.loop.run_in_executor(self.executor, slow_func) try: yield from self.future except asyncio.CancelledError: print("-cancelled-") except Exception as err: print("OOOPS", type(err), err) finally: self.event.set() def run(self): for run in range(1000): self.loop.call_soon_threadsafe(functools.partial(asyncio.async, self.task(), loop=self.loop)) delay = random.randint(0, 15) / 1000 time.sleep(delay) if self.future is not None: self.future.cancel() self.event.wait() self.event.clear() print("run #%s" % run) self.loop.call_soon_threadsafe(self.loop.stop) def main(): logging.basicConfig() el = EventLoopThread() el.start() el.event.wait() worker = WorkerThread(el.loop) worker.start() el.join() worker.join() main()