Issue39349
Created on 2020-01-16 04:08 by aeros, last changed 2020-09-02 19:31 by aeros. This issue is now closed.
Pull Requests | |||
---|---|---|---|
URL | Status | Linked | Edit |
PR 18057 | merged | aeros, 2020-01-19 04:10 | |
PR 22023 | merged | hauntsaninja, 2020-08-31 06:07 | |
PR 22048 | merged | hauntsaninja, 2020-09-01 21:25 |
Messages (12) | |||
---|---|---|---|
msg360093 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-01-16 04:08 | |
This feature enhancement issue is based on the following python-ideas thread: https://mail.python.org/archives/list/python-ideas@python.org/thread/ZSUIFN5GTDO56H6LLOPXOLQK7EQQZJHJ/ In summary, the main suggestion was to implement a new parameter called "cancel" (some bikeshedding over the name is welcome, I was thinking "cancel_futures" might be another option) for Executor.shutdown(), that would be added to both ThreadPoolExecutor and ProcessPoolExecutor. When set to True, this parameter would cancel all pending futures that were scheduled to the executor just after setting self._shutdown. In order to build some experience in working with the internals of the executors (particularly for implementing native pools in asyncio in the future), I plan on working on this issue; assuming Antoine and/or Brian are +1 on it. Guido seemed to approve of the idea. The implementation in ThreadPoolExecutor should be fairly straightforward, as it would use much of the same logic that's in the private method _initializer_failed() (https://github.com/python/cpython/blob/fad8b5674c66d9e00bb788e30adddb0c256c787b/Lib/concurrent/futures/thread.py#L205-L216). Minus the setting of self._broken, and cancelling each of the work_items (pending futures) instead of setting the BrokenThreadPool exception. For ProcessPoolExecutor, I'll likely have to spend some more time looking into the implementation details of it to figure out how the cancellation will work. IIUC, it would involve adding some additional logic in _queue_management_worker(), the function which is used by the queue management thread to communicate between the main process and the worker processes spawned by ProcessPoolExecutor. Specifically, in the "if shutting_down()" block (https://github.com/python/cpython/blob/fad8b5674c66d9e00bb788e30adddb0c256c787b/Lib/concurrent/futures/process.py#L432-L446), I think we could add an additional conditional check to see if self._cancel_pending_work_items is true (new internal flag set during executor.shutdown() if *cancel* is true, just after setting "self._shutdown_thread = True"). In this block, it would iterate through the pending work items, and cancel their associated future. Here's a rough example of what I have in mind: ``` if shutting_down(): try: # Flag the executor as shutting down as early as possible if it # is not gc-ed yet. if executor is not None: executor._shutdown_thread = True + if executor._cancel_pending_work_items: + # We only care about the values in the dict, which are + # the actual work items. + for work_item in pending_work_items.values(): + work_item.future.cancel() # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: shutdown_worker() return except Full: # This is not a problem: we will eventually be woken up (in # result_queue.get()) and be able to send a sentinel again. pass ``` Would something along the lines of the above be a potentially viable method of implementing the *cancel* parameter for ProcessPoolExecutor.shutdown()? The main downside to this implementation is that it can't cancel anything that is already running (pushed from pending_work_items to call_queue). But from my understanding, there isn't a viable means of cancelling anything in the call queue; at that point it's too late. Anyways, I'll work on the ThreadPoolExecutor implementation in the meantime. As mentioned previously, that one should be more straightforward. After getting it working, I'll open a PR for just ThreadPoolExecutor, and then work on ProcessPoolExecutor in another PR after receiving some feedback on the above idea. |
|||
msg360160 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-01-17 05:01 | |
As of now, I have the implementation for ThreadPoolExecutor working correctly, and a unit test added to ensure its behavior. It was a bit more involved than I originally anticipated, as I had to make a minor change in the _worker() function to allow the new parameter to be compatible with wait (which is important, as it prevents dangling threads). With my initial implementation, using "wait=True" and "cancel_futures=True" was resulting in any threads that called work_queue.get(block=True) to hang indefinitely. In order to remedy this, I changed it to use work_queue.get_nowait(). If a queue.Empty exception occurs, it checks for a global constant _cancelling_futures (set to True just in shutdown before it starts draining the work queue). If it's true, the while True loop is broken, otherwise it continues to the next iteration. This effectively has the same behavior as before. I experimented with a few different possible solutions, and the above was the only one that worked while still maintaining the current behavior as much as possible. From what I can tell, this was the only viable means of implementing the new parameter without making it entirely incompatible with "wait=True". At this point, I believe the only remaining step for the ThreadPoolExecutor implementation is to update the documentation and decide on the name. After working with it, I'm leaning more towards *cancel_futures*, as I think this more accurately describes what it does compared to just *cancel* (which is a bit vague IMO). |
|||
msg360162 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-01-17 07:54 | |
> It was a bit more involved than I originally anticipated, as I had to make a minor change in the _worker() function to allow the new parameter to be compatible with wait (which is important, as it prevents dangling threads). Never mind, I just realized that I could simply add work_queue.put(None) at the end of the queue drain to unblock the indefinitely hanging thread. So, the global constant and change in logic for _worker() isn't needed. |
|||
msg360253 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-01-19 03:14 | |
I now have a working implementation, for both ThreadPoolExecutor and ProcessPoolExecutor. I've also ensured that the tests I added are not vulnerable to race conditions with the following: ``` [aeros:~/repos/aeros-cpython]$ ./python -m test test_concurrent_futures --match test_cancel_futures -j200 -v -F [snip] Ran 4 tests in 2.865s OK 0:03:24 load avg: 143.25 [1018] test_concurrent_futures passed -- running: test_concurrent_futures (2 min 36 sec), test_concurrent_futures (35.8 sec) test_cancel_futures (test.test_concurrent_futures.ProcessPoolForkProcessPoolShutdownTest) ... 0.57s ok test_cancel_futures (test.test_concurrent_futures.ProcessPoolForkserverProcessPoolShutdownTest) ... 0.80s ok test_cancel_futures (test.test_concurrent_futures.ProcessPoolSpawnProcessPoolShutdownTest) ... 0.53s ok test_cancel_futures (test.test_concurrent_futures.ThreadPoolShutdownTest) ... 0.20s ok ``` I'll attach a PR to the issue once I finish writing the documentation and "What's New" entry. Note: I was originally going to do this in two separate PRs, one for each executor, but it seemed to make more sense to just have it as a single cohesive PR since Executor.shutdown() shares the same documentation for both executors. |
|||
msg361231 - (view) | Author: Antoine Pitrou (pitrou) * ![]() |
Date: 2020-02-02 12:49 | |
New changeset 339fd46cb764277cbbdc3e78dcc5b45b156bb6ae by Kyle Stanley in branch 'master': bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057) https://github.com/python/cpython/commit/339fd46cb764277cbbdc3e78dcc5b45b156bb6ae |
|||
msg361232 - (view) | Author: Antoine Pitrou (pitrou) * ![]() |
Date: 2020-02-02 12:50 | |
Thank you very muck, Kyle! |
|||
msg361260 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-02-03 00:01 | |
> Thank you very muck, Kyle! No problem. Thanks for reviewing and merging the PR, Antoine! |
|||
msg376133 - (view) | Author: Shantanu (hauntsaninja) * | Date: 2020-08-30 22:24 | |
I was working on updating typeshed stubs to reflect this change. It looks like the parameter wasn't actually added to the base class (https://github.com/python/cpython/blob/c3a651ad2544d7d1be389b63e9a4a58a92a31623/Lib/concurrent/futures/_base.py#L608), even though it's documented as having the new parameter (https://docs.python.org/3.9/library/concurrent.futures.html#concurrent.futures.Executor.shutdown). Is this intentional? If not, I'd be happy to submit a PR adding the parameter to the base class. |
|||
msg376139 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-08-31 01:41 | |
Good catch, Shantanu. It was not intentional on my part, and it would make sense to include *cancel_futures* in the base Executor class as documented. If you'd like to submit a PR to add it (attaching it to this issue), I should be able to review it within a reasonable period. |
|||
msg376211 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-09-01 21:18 | |
New changeset 17dc1b789ecc33b4a254eb3b799085f4b3624ca5 by Shantanu in branch 'master': bpo-39349: Add cancel_futures to Executor.shutdown base class (GH-22023) https://github.com/python/cpython/commit/17dc1b789ecc33b4a254eb3b799085f4b3624ca5 |
|||
msg376260 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-09-02 19:28 | |
New changeset a763ee3c583e6a2dfe1b1ac0600a48e8a978ed50 by Shantanu in branch '3.9': [3.9] bpo-39349: Add cancel_futures to Executor.shutdown base class (GH-22023) (GH-22048) https://github.com/python/cpython/commit/a763ee3c583e6a2dfe1b1ac0600a48e8a978ed50 |
|||
msg376261 - (view) | Author: Kyle Stanley (aeros) * ![]() |
Date: 2020-09-02 19:31 | |
Thanks for bringing attention to cancel_futures being missing in the base Executor class and for the PR, Shantanu! |
History | |||
---|---|---|---|
Date | User | Action | Args |
2020-09-02 19:31:13 | aeros | set | messages: + msg376261 |
2020-09-02 19:28:37 | aeros | set | messages: + msg376260 |
2020-09-01 21:25:39 | hauntsaninja | set | pull_requests: + pull_request21145 |
2020-09-01 21:18:11 | aeros | set | messages: + msg376211 |
2020-08-31 06:07:30 | hauntsaninja | set | pull_requests: + pull_request21123 |
2020-08-31 01:41:32 | aeros | set | messages: + msg376139 |
2020-08-30 22:24:46 | hauntsaninja | set | nosy:
+ hauntsaninja messages: + msg376133 |
2020-02-03 00:01:22 | aeros | set | messages: + msg361260 |
2020-02-02 12:50:29 | pitrou | set | status: open -> closed resolution: fixed messages: + msg361232 stage: patch review -> resolved |
2020-02-02 12:49:04 | pitrou | set | messages: + msg361231 |
2020-01-19 04:10:56 | aeros | set | keywords:
+ patch stage: needs patch -> patch review pull_requests: + pull_request17451 |
2020-01-19 03:14:05 | aeros | set | messages: + msg360253 |
2020-01-17 07:54:57 | aeros | set | messages: + msg360162 |
2020-01-17 05:01:38 | aeros | set | messages:
+ msg360160 title: Add "cancel" parameter to concurrent.futures.Executor.shutdown() -> Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown() |
2020-01-16 09:18:12 | aeros | set | components: + Library (Lib) |
2020-01-16 04:08:34 | aeros | create |