diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -66,28 +66,17 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_thread_references = set() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for @@ -130,11 +119,15 @@ def _process_worker(call_queue, result_q """ while True: try: - call_item = call_queue.get(block=True, timeout=0.1) + call_item = call_queue.get(block=True) except queue.Empty: if shutdown.is_set(): return else: + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: @@ -209,40 +202,56 @@ def _queue_manangement_worker(executor_r process workers that they should exit when their work queue is empty. """ + nb_shutdown_processes = 0 + def shutdown_one_process(): + """Tell a worker to terminate, which will in turn wake us again""" + nonlocal nb_shutdown_processes + call_queue.put(None) + nb_shutdown_processes += 1 while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) try: - result_item = result_queue.get(block=True, timeout=0.1) + result_item = result_queue.get(block=True) except queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() + pass + else: + if result_item is not None: + work_item = pending_work_items[result_item.work_id] + del pending_work_items[result_item.work_id] - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + continue + # If we come here, we either got a timeout or were explicitly woken up. + # In either case, check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_process_event.set() - if result_item.exception: - work_item.future.set_exception(result_item.exception) + while nb_shutdown_processes < len(processes): + shutdown_one_process() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + return else: - work_item.future.set_result(result_item.result) + # Start shutting down by telling a process it can exit. + shutdown_one_process() + del executor _system_limits_checked = False _system_limited = None @@ -279,7 +288,6 @@ class ProcessPoolExecutor(_base.Executor worker processes will be created as the machine has processors. """ _check_system_limits() - _remove_dead_thread_references() if max_workers is None: self._max_workers = multiprocessing.cpu_count() @@ -304,10 +312,14 @@ class ProcessPoolExecutor(_base.Executor self._pending_work_items = {} def _start_queue_management_thread(self): + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, q=self._result_queue): + q.put(None) if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( target=_queue_manangement_worker, - args=(weakref.ref(self), + args=(weakref.ref(self, weakref_cb), self._processes, self._pending_work_items, self._work_ids, @@ -316,7 +328,7 @@ class ProcessPoolExecutor(_base.Executor self._shutdown_process_event)) self._queue_management_thread.daemon = True self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) + _threads_queues[self._queue_management_thread] = self._result_queue def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -339,6 +351,8 @@ class ProcessPoolExecutor(_base.Executor self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 + # Wake up queue management thread + self._result_queue.put(None) self._start_queue_management_thread() self._adjust_process_count() @@ -348,8 +362,10 @@ class ProcessPoolExecutor(_base.Executor def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True - if wait: - if self._queue_management_thread: + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: self._queue_management_thread.join() # To reduce the risk of openning too many files, remove references to # objects that use file descriptors. diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -25,28 +25,17 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads finish. -_thread_references = set() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() atexit.register(_python_exit) @@ -72,18 +61,23 @@ def _worker(executor_reference, work_que try: while True: try: - work_item = work_queue.get(block=True, timeout=0.1) + work_item = work_queue.get(block=True) except queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor + pass else: - work_item.run() + if work_item is not None: + work_item.run() + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + work_queue.put(None) + return + del executor except BaseException as e: _base.LOGGER.critical('Exception in worker', exc_info=True) @@ -95,8 +89,6 @@ class ThreadPoolExecutor(_base.Executor) max_workers: The maximum number of threads that can be used to execute the given calls. """ - _remove_dead_thread_references() - self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() @@ -117,19 +109,25 @@ class ThreadPoolExecutor(_base.Executor) submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) + args=(weakref.ref(self, weakref_cb), + self._work_queue)) t.daemon = True t.start() self._threads.add(t) - _thread_references.add(weakref.ref(t)) + _threads_queues[t] = self._work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True + self._work_queue.put(None) if wait: for t in self._threads: t.join()