This feature enhancement issue is based on the following python-ideas thread: https://mail.python.org/archives/list/python-ideas@python.org/thread/ZSUIFN5GTDO56H6LLOPXOLQK7EQQZJHJ/
In summary, the main suggestion was to implement a new parameter called "cancel" (some bikeshedding over the name is welcome, I was thinking "cancel_futures" might be another option) for Executor.shutdown(), that would be added to both ThreadPoolExecutor and ProcessPoolExecutor. When set to True, this parameter would cancel all pending futures that were scheduled to the executor just after setting self._shutdown.
In order to build some experience in working with the internals of the executors (particularly for implementing native pools in asyncio in the future), I plan on working on this issue; assuming Antoine and/or Brian are +1 on it. Guido seemed to approve of the idea.
The implementation in ThreadPoolExecutor should be fairly straightforward, as it would use much of the same logic that's in the private method _initializer_failed() (https://github.com/python/cpython/blob/fad8b5674c66d9e00bb788e30adddb0c256c787b/Lib/concurrent/futures/thread.py#L205-L216). Minus the setting of self._broken, and cancelling each of the work_items (pending futures) instead of setting the BrokenThreadPool exception.
For ProcessPoolExecutor, I'll likely have to spend some more time looking into the implementation details of it to figure out how the cancellation will work. IIUC, it would involve adding some additional logic in _queue_management_worker(), the function which is used by the queue management thread to communicate between the main process and the worker processes spawned by ProcessPoolExecutor.
Specifically, in the "if shutting_down()" block (https://github.com/python/cpython/blob/fad8b5674c66d9e00bb788e30adddb0c256c787b/Lib/concurrent/futures/process.py#L432-L446), I think we could add an additional conditional check to see if self._cancel_pending_work_items is true (new internal flag set during executor.shutdown() if *cancel* is true, just after setting "self._shutdown_thread = True"). In this block, it would iterate through the pending work items, and cancel their associated future. Here's a rough example of what I have in mind:
```
if shutting_down():
try:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown_thread = True
+ if executor._cancel_pending_work_items:
+ # We only care about the values in the dict, which are
+ # the actual work items.
+ for work_item in pending_work_items.values():
+ work_item.future.cancel()
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_worker()
return
except Full:
# This is not a problem: we will eventually be woken up (in
# result_queue.get()) and be able to send a sentinel again.
pass
```
Would something along the lines of the above be a potentially viable method of implementing the *cancel* parameter for ProcessPoolExecutor.shutdown()? The main downside to this implementation is that it can't cancel anything that is already running (pushed from pending_work_items to call_queue). But from my understanding, there isn't a viable means of cancelling anything in the call queue; at that point it's too late.
Anyways, I'll work on the ThreadPoolExecutor implementation in the meantime. As mentioned previously, that one should be more straightforward. After getting it working, I'll open a PR for just ThreadPoolExecutor, and then work on ProcessPoolExecutor in another PR after receiving some feedback on the above idea.
|