# Copyright 2009 Brian Quinlan. All Rights Reserved. # Licensed to PSF under a Contributor Agreement. """Implements ThreadPoolExecutor.""" from __future__ import with_statement import atexit import threading import weakref import sys import traceback from concurrent.futures import _base import Queue as queue __author__ = 'Brian Quinlan (brian@sweetapp.com)' # Workers are created as daemon threads. This is done to allow the interpreter # to exit when there are still idle threads in a ThreadPoolExecutor's thread # pool (i.e. shutdown() was not called). However, allowing workers to die with # the interpreter has two undesirable properties: # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could # be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the # workers to exit when their work queues are empty and then waits until the # threads finish. _threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) for t, q in items: t.join() atexit.register(_python_exit) class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException: e, tb = sys.exc_info()[1:] self.future.set_exception_info(e, tb) else: self.future.set_result(result) class Worker(threading.Thread): def __init__(self, executor_reference, work_queue): self.executor_reference = executor_reference self.work_queue = work_queue self.runLock = threading.Lock() threading.Thread.__init__(self) def run(self): try: while True: work_item = self.work_queue.get(block=True) with self.runLock: if work_item is not None: work_item.run() continue executor = self.executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers self.work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): def __init__(self, max_workers): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. """ self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f # submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = Worker(weakref.ref(self, weakref_cb), self._work_queue) t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True self._work_queue.put(None) if wait: for t in self._threads: t.join() # shutdown.__doc__ = _base.Executor.shutdown.__doc__