New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown() #83530
Comments
This feature enhancement issue is based on the following python-ideas thread: https://mail.python.org/archives/list/python-ideas@python.org/thread/ZSUIFN5GTDO56H6LLOPXOLQK7EQQZJHJ/ In summary, the main suggestion was to implement a new parameter called "cancel" (some bikeshedding over the name is welcome, I was thinking "cancel_futures" might be another option) for Executor.shutdown(), that would be added to both ThreadPoolExecutor and ProcessPoolExecutor. When set to True, this parameter would cancel all pending futures that were scheduled to the executor just after setting self._shutdown. In order to build some experience in working with the internals of the executors (particularly for implementing native pools in asyncio in the future), I plan on working on this issue; assuming Antoine and/or Brian are +1 on it. Guido seemed to approve of the idea. The implementation in ThreadPoolExecutor should be fairly straightforward, as it would use much of the same logic that's in the private method _initializer_failed() ( cpython/Lib/concurrent/futures/thread.py Lines 205 to 216 in fad8b56
For ProcessPoolExecutor, I'll likely have to spend some more time looking into the implementation details of it to figure out how the cancellation will work. IIUC, it would involve adding some additional logic in _queue_management_worker(), the function which is used by the queue management thread to communicate between the main process and the worker processes spawned by ProcessPoolExecutor. Specifically, in the "if shutting_down()" block ( cpython/Lib/concurrent/futures/process.py Lines 432 to 446 in fad8b56
Would something along the lines of the above be a potentially viable method of implementing the *cancel* parameter for ProcessPoolExecutor.shutdown()? The main downside to this implementation is that it can't cancel anything that is already running (pushed from pending_work_items to call_queue). But from my understanding, there isn't a viable means of cancelling anything in the call queue; at that point it's too late. Anyways, I'll work on the ThreadPoolExecutor implementation in the meantime. As mentioned previously, that one should be more straightforward. After getting it working, I'll open a PR for just ThreadPoolExecutor, and then work on ProcessPoolExecutor in another PR after receiving some feedback on the above idea. |
As of now, I have the implementation for ThreadPoolExecutor working correctly, and a unit test added to ensure its behavior. It was a bit more involved than I originally anticipated, as I had to make a minor change in the _worker() function to allow the new parameter to be compatible with wait (which is important, as it prevents dangling threads). With my initial implementation, using "wait=True" and "cancel_futures=True" was resulting in any threads that called work_queue.get(block=True) to hang indefinitely. In order to remedy this, I changed it to use work_queue.get_nowait(). If a queue.Empty exception occurs, it checks for a global constant _cancelling_futures (set to True just in shutdown before it starts draining the work queue). If it's true, the while True loop is broken, otherwise it continues to the next iteration. This effectively has the same behavior as before. I experimented with a few different possible solutions, and the above was the only one that worked while still maintaining the current behavior as much as possible. From what I can tell, this was the only viable means of implementing the new parameter without making it entirely incompatible with "wait=True". At this point, I believe the only remaining step for the ThreadPoolExecutor implementation is to update the documentation and decide on the name. After working with it, I'm leaning more towards *cancel_futures*, as I think this more accurately describes what it does compared to just *cancel* (which is a bit vague IMO). |
Never mind, I just realized that I could simply add work_queue.put(None) at the end of the queue drain to unblock the indefinitely hanging thread. So, the global constant and change in logic for _worker() isn't needed. |
I now have a working implementation, for both ThreadPoolExecutor and ProcessPoolExecutor. I've also ensured that the tests I added are not vulnerable to race conditions with the following:
I'll attach a PR to the issue once I finish writing the documentation and "What's New" entry. Note: I was originally going to do this in two separate PRs, one for each executor, but it seemed to make more sense to just have it as a single cohesive PR since Executor.shutdown() shares the same documentation for both executors. |
Thank you very muck, Kyle! |
No problem. Thanks for reviewing and merging the PR, Antoine! |
I was working on updating typeshed stubs to reflect this change. It looks like the parameter wasn't actually added to the base class ( cpython/Lib/concurrent/futures/_base.py Line 608 in c3a651a
Is this intentional? If not, I'd be happy to submit a PR adding the parameter to the base class. |
Good catch, Shantanu. It was not intentional on my part, and it would make sense to include *cancel_futures* in the base Executor class as documented. If you'd like to submit a PR to add it (attaching it to this issue), I should be able to review it within a reasonable period. |
Thanks for bringing attention to cancel_futures being missing in the base Executor class and for the PR, Shantanu! |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: