This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown()
Type: enhancement Stage: resolved
Components: Library (Lib) Versions: Python 3.9
process
Status: closed Resolution: fixed
Dependencies: Superseder:
Assigned To: aeros Nosy List: aeros, bquinlan, hauntsaninja, pitrou
Priority: normal Keywords: patch

Created on 2020-01-16 04:08 by aeros, last changed 2022-04-11 14:59 by admin. This issue is now closed.

Pull Requests
URL Status Linked Edit
PR 18057 merged aeros, 2020-01-19 04:10
PR 22023 merged hauntsaninja, 2020-08-31 06:07
PR 22048 merged hauntsaninja, 2020-09-01 21:25
Messages (12)
msg360093 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-01-16 04:08
This feature enhancement issue is based on the following python-ideas thread: https://mail.python.org/archives/list/python-ideas@python.org/thread/ZSUIFN5GTDO56H6LLOPXOLQK7EQQZJHJ/

In summary, the main suggestion was to implement a new parameter called "cancel" (some bikeshedding over the name is welcome, I was thinking "cancel_futures" might be another option) for Executor.shutdown(), that would be added to both ThreadPoolExecutor and ProcessPoolExecutor. When set to True, this parameter would cancel all pending futures that were scheduled to the executor just after setting self._shutdown.

In order to build some experience in working with the internals of the executors (particularly for implementing native pools in asyncio in the future), I plan on working on this issue; assuming Antoine and/or Brian are +1 on it. Guido seemed to approve of the idea.

The implementation in ThreadPoolExecutor should be fairly straightforward, as it would use much of the same logic that's in the private method _initializer_failed() (https://github.com/python/cpython/blob/fad8b5674c66d9e00bb788e30adddb0c256c787b/Lib/concurrent/futures/thread.py#L205-L216). Minus the setting of self._broken, and cancelling each of the work_items (pending futures) instead of setting the BrokenThreadPool exception.

For ProcessPoolExecutor, I'll likely have to spend some more time looking into the implementation details of it to figure out how the cancellation will work. IIUC, it would involve adding some additional logic in _queue_management_worker(), the function which is used by the queue management thread to communicate between the main process and the worker processes spawned by ProcessPoolExecutor.

Specifically, in the "if shutting_down()" block (https://github.com/python/cpython/blob/fad8b5674c66d9e00bb788e30adddb0c256c787b/Lib/concurrent/futures/process.py#L432-L446), I think we could add an additional conditional check to see if self._cancel_pending_work_items is true (new internal flag set during executor.shutdown() if *cancel* is true, just after setting "self._shutdown_thread = True"). In this block, it would iterate through the pending work items, and cancel their associated future. Here's a rough example of what I have in mind:

```
        if shutting_down():
            try:
                # Flag the executor as shutting down as early as possible if it
                # is not gc-ed yet.
                if executor is not None:
                    executor._shutdown_thread = True
+                  if executor._cancel_pending_work_items:
+                      # We only care about the values in the dict, which are
+                      # the actual work items.
+                      for work_item in pending_work_items.values():
+                          work_item.future.cancel()
                # Since no new work items can be added, it is safe to shutdown
                # this thread if there are no pending work items.
                if not pending_work_items:
                    shutdown_worker()
                    return
            except Full:
                # This is not a problem: we will eventually be woken up (in
                # result_queue.get()) and be able to send a sentinel again.
                pass
```

Would something along the lines of the above be a potentially viable method of implementing the *cancel* parameter for ProcessPoolExecutor.shutdown()? The main downside to this implementation is that it can't cancel anything that is already running (pushed from pending_work_items to call_queue). But from my understanding, there isn't a viable means of cancelling anything in the call queue; at that point it's too late.

Anyways, I'll work on the ThreadPoolExecutor implementation in the meantime. As mentioned previously, that one should be more straightforward. After getting it working, I'll open a PR for just ThreadPoolExecutor, and then work on ProcessPoolExecutor in another PR after receiving some feedback on the above idea.
msg360160 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-01-17 05:01
As of now, I have the implementation for ThreadPoolExecutor working correctly, and a unit test added to ensure its behavior. It was a bit more involved than I originally anticipated, as I had to make a minor change in the _worker() function to allow the new parameter to be compatible with wait (which is important, as it prevents dangling threads).

With my initial implementation, using "wait=True" and "cancel_futures=True" was resulting in any threads that called work_queue.get(block=True) to hang indefinitely. In order to remedy this, I changed it to use work_queue.get_nowait(). If a queue.Empty exception occurs, it checks for a global constant _cancelling_futures (set to True just in shutdown before it starts draining the work queue). If it's true, the while True loop is broken, otherwise it continues to the next iteration. This effectively has the same behavior as before. 

I experimented with a few different possible solutions, and the above was the only one that worked while still maintaining the current behavior as much as possible. From what I can tell, this was the only viable means of implementing the new parameter without making it entirely incompatible with "wait=True".

At this point, I believe the only remaining step for the ThreadPoolExecutor implementation is to update the documentation and decide on the name. After working with it, I'm leaning more towards *cancel_futures*, as I think this more accurately describes what it does compared to just *cancel* (which is a bit vague IMO).
msg360162 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-01-17 07:54
> It was a bit more involved than I originally anticipated, as I had to make a minor change in the _worker() function to allow the new parameter to be compatible with wait (which is important, as it prevents dangling threads).

Never mind, I just realized that I could simply add work_queue.put(None) at the end of the queue drain to unblock the indefinitely hanging thread. So, the global constant and change in logic for _worker() isn't needed.
msg360253 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-01-19 03:14
I now have a working implementation, for both ThreadPoolExecutor and ProcessPoolExecutor. I've also ensured that the tests I added are not vulnerable to race conditions with the following:

```
[aeros:~/repos/aeros-cpython]$ ./python -m test test_concurrent_futures --match test_cancel_futures -j200 -v -F

[snip]
Ran 4 tests in 2.865s

OK
0:03:24 load avg: 143.25 [1018] test_concurrent_futures passed -- running: test_concurrent_futures (2 min 36 sec), test_concurrent_futures (35.8 sec)
test_cancel_futures (test.test_concurrent_futures.ProcessPoolForkProcessPoolShutdownTest) ... 0.57s ok
test_cancel_futures (test.test_concurrent_futures.ProcessPoolForkserverProcessPoolShutdownTest) ... 0.80s ok
test_cancel_futures (test.test_concurrent_futures.ProcessPoolSpawnProcessPoolShutdownTest) ... 0.53s ok
test_cancel_futures (test.test_concurrent_futures.ThreadPoolShutdownTest) ... 0.20s ok
```

I'll attach a PR to the issue once I finish writing the documentation and "What's New" entry.

Note: I was originally going to do this in two separate PRs, one for each executor, but it seemed to make more sense to just have it as a single cohesive PR since Executor.shutdown() shares the same documentation for both executors.
msg361231 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2020-02-02 12:49
New changeset 339fd46cb764277cbbdc3e78dcc5b45b156bb6ae by Kyle Stanley in branch 'master':
bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057)
https://github.com/python/cpython/commit/339fd46cb764277cbbdc3e78dcc5b45b156bb6ae
msg361232 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2020-02-02 12:50
Thank you very muck, Kyle!
msg361260 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-02-03 00:01
> Thank you very muck, Kyle!

No problem. Thanks for reviewing and merging the PR, Antoine!
msg376133 - (view) Author: Shantanu (hauntsaninja) * Date: 2020-08-30 22:24
I was working on updating typeshed stubs to reflect this change. It looks like the parameter wasn't actually added to the base class (https://github.com/python/cpython/blob/c3a651ad2544d7d1be389b63e9a4a58a92a31623/Lib/concurrent/futures/_base.py#L608), even though it's documented as having the new parameter (https://docs.python.org/3.9/library/concurrent.futures.html#concurrent.futures.Executor.shutdown).

Is this intentional? If not, I'd be happy to submit a PR adding the parameter to the base class.
msg376139 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-08-31 01:41
Good catch, Shantanu. It was not intentional on my part, and it would make sense to include *cancel_futures* in the base Executor class as documented. If you'd like to submit a PR to add it (attaching it to this issue), I should be able to review it within a reasonable period.
msg376211 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-09-01 21:18
New changeset 17dc1b789ecc33b4a254eb3b799085f4b3624ca5 by Shantanu in branch 'master':
bpo-39349: Add cancel_futures to Executor.shutdown base class (GH-22023)
https://github.com/python/cpython/commit/17dc1b789ecc33b4a254eb3b799085f4b3624ca5
msg376260 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-09-02 19:28
New changeset a763ee3c583e6a2dfe1b1ac0600a48e8a978ed50 by Shantanu in branch '3.9':
[3.9] bpo-39349: Add cancel_futures to Executor.shutdown base class (GH-22023) (GH-22048)
https://github.com/python/cpython/commit/a763ee3c583e6a2dfe1b1ac0600a48e8a978ed50
msg376261 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-09-02 19:31
Thanks for bringing attention to cancel_futures being missing in the base Executor class and for the PR, Shantanu!
History
Date User Action Args
2022-04-11 14:59:25adminsetgithub: 83530
2020-09-02 19:31:13aerossetmessages: + msg376261
2020-09-02 19:28:37aerossetmessages: + msg376260
2020-09-01 21:25:39hauntsaninjasetpull_requests: + pull_request21145
2020-09-01 21:18:11aerossetmessages: + msg376211
2020-08-31 06:07:30hauntsaninjasetpull_requests: + pull_request21123
2020-08-31 01:41:32aerossetmessages: + msg376139
2020-08-30 22:24:46hauntsaninjasetnosy: + hauntsaninja
messages: + msg376133
2020-02-03 00:01:22aerossetmessages: + msg361260
2020-02-02 12:50:29pitrousetstatus: open -> closed
resolution: fixed
messages: + msg361232

stage: patch review -> resolved
2020-02-02 12:49:04pitrousetmessages: + msg361231
2020-01-19 04:10:56aerossetkeywords: + patch
stage: needs patch -> patch review
pull_requests: + pull_request17451
2020-01-19 03:14:05aerossetmessages: + msg360253
2020-01-17 07:54:57aerossetmessages: + msg360162
2020-01-17 05:01:38aerossetmessages: + msg360160
title: Add "cancel" parameter to concurrent.futures.Executor.shutdown() -> Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown()
2020-01-16 09:18:12aerossetcomponents: + Library (Lib)
2020-01-16 04:08:34aeroscreate