--- thread.py 2014-09-06 22:02:09.000000000 +0800 +++ thread_new.py 2015-02-28 14:19:24.000000000 +0800 @@ -65,25 +65,36 @@ else: self.future.set_result(result) -def _worker(executor_reference, work_queue): - try: - while True: - work_item = work_queue.get(block=True) - if work_item is not None: - work_item.run() - continue - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - # Notice other workers - work_queue.put(None) - return - del executor - except BaseException: - _base.LOGGER.critical('Exception in worker', exc_info=True) +class Worker(threading.Thread): + def __init__(self, executor_reference, work_queue): + self.executor_reference = executor_reference + self.work_queue = work_queue + self.runLock = threading.Lock() + threading.Thread.__init__(self) + + def run(self): + try: + while True: + work_item = self.work_queue.get(block=True) + if work_item is not None: + work_item.run() + continue + + executor = self.executor_reference() + + # Never contended, but necessary for visibility + with self.runLock: + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + self.work_queue.put(None) + return + del executor + except BaseException: + _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): def __init__(self, max_workers): @@ -120,9 +131,7 @@ # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, - args=(weakref.ref(self, weakref_cb), - self._work_queue)) + t = Worker(weakref.ref(self, weakref_cb), self._work_queue) t.daemon = True t.start() self._threads.add(t)