diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -104,7 +104,7 @@ class _CallItem(object): self.args = args self.kwargs = kwargs -def _process_worker(call_queue, result_queue, shutdown): +def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -118,24 +118,19 @@ def _process_worker(call_queue, result_q worker that it should exit when call_queue is empty. """ while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return try: - call_item = call_queue.get(block=True) - except queue.Empty: - if shutdown.is_set(): - return + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException as e: + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) else: - if call_item is None: - # Wake up queue management thread - result_queue.put(None) - return - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException as e: - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + result_queue.put(_ResultItem(call_item.work_id, + result=r)) def _add_call_item_to_queue(pending_work_items, work_ids, @@ -179,8 +174,7 @@ def _queue_manangement_worker(executor_r pending_work_items, work_ids_queue, call_queue, - result_queue, - shutdown_process_event): + result_queue): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -198,9 +192,6 @@ def _queue_manangement_worker(executor_r derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. """ nb_shutdown_processes = 0 def shutdown_one_process(): @@ -213,20 +204,16 @@ def _queue_manangement_worker(executor_r work_ids_queue, call_queue) - try: - result_item = result_queue.get(block=True) - except queue.Empty: - pass - else: - if result_item is not None: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] + result_item = result_queue.get(block=True) + if result_item is not None: + work_item = pending_work_items[result_item.work_id] + del pending_work_items[result_item.work_id] - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - continue + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + continue # If we come here, we either got a timeout or were explicitly woken up. # In either case, check whether we should start shutting down. executor = executor_reference() @@ -238,8 +225,6 @@ def _queue_manangement_worker(executor_r # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: - shutdown_process_event.set() - while nb_shutdown_processes < len(processes): shutdown_one_process() # If .join() is not called on the created processes then @@ -306,7 +291,6 @@ class ProcessPoolExecutor(_base.Executor # Shutdown is a two-step process. self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() self._shutdown_lock = threading.Lock() self._queue_count = 0 self._pending_work_items = {} @@ -324,8 +308,7 @@ class ProcessPoolExecutor(_base.Executor self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue, - self._shutdown_process_event)) + self._result_queue)) self._queue_management_thread.daemon = True self._queue_management_thread.start() _threads_queues[self._queue_management_thread] = self._result_queue @@ -335,8 +318,7 @@ class ProcessPoolExecutor(_base.Executor p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) + self._result_queue)) p.start() self._processes.add(p) @@ -372,7 +354,6 @@ class ProcessPoolExecutor(_base.Executor self._queue_management_thread = None self._call_queue = None self._result_queue = None - self._shutdown_process_event = None self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -60,14 +60,10 @@ class _WorkItem(object): def _worker(executor_reference, work_queue): try: while True: - try: - work_item = work_queue.get(block=True) - except queue.Empty: - pass - else: - if work_item is not None: - work_item.run() - continue + work_item = work_queue.get(block=True) + if work_item is not None: + work_item.run() + continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR