Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown() #83530

Closed
aeros opened this issue Jan 16, 2020 · 12 comments
Closed

Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown() #83530

aeros opened this issue Jan 16, 2020 · 12 comments
Assignees
Labels
3.9 only security fixes stdlib Python modules in the Lib dir type-feature A feature request or enhancement

Comments

@aeros
Copy link
Contributor

aeros commented Jan 16, 2020

BPO 39349
Nosy @brianquinlan, @pitrou, @aeros, @hauntsaninja
PRs
  • bpo-39349: Add *cancel_futures* to Executor.shutdown() #18057
  • bpo-39349: Add cancel_futures to Executor.shutdown base class #22023
  • [3.9] bpo-39349: Add cancel_futures to Executor.shutdown base class (GH-22023) #22048
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = 'https://github.com/aeros'
    closed_at = <Date 2020-02-02.12:50:29.568>
    created_at = <Date 2020-01-16.04:08:34.206>
    labels = ['type-feature', 'library', '3.9']
    title = 'Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown()'
    updated_at = <Date 2020-09-02.19:31:13.581>
    user = 'https://github.com/aeros'

    bugs.python.org fields:

    activity = <Date 2020-09-02.19:31:13.581>
    actor = 'aeros'
    assignee = 'aeros'
    closed = True
    closed_date = <Date 2020-02-02.12:50:29.568>
    closer = 'pitrou'
    components = ['Library (Lib)']
    creation = <Date 2020-01-16.04:08:34.206>
    creator = 'aeros'
    dependencies = []
    files = []
    hgrepos = []
    issue_num = 39349
    keywords = ['patch']
    message_count = 12.0
    messages = ['360093', '360160', '360162', '360253', '361231', '361232', '361260', '376133', '376139', '376211', '376260', '376261']
    nosy_count = 4.0
    nosy_names = ['bquinlan', 'pitrou', 'aeros', 'hauntsaninja']
    pr_nums = ['18057', '22023', '22048']
    priority = 'normal'
    resolution = 'fixed'
    stage = 'resolved'
    status = 'closed'
    superseder = None
    type = 'enhancement'
    url = 'https://bugs.python.org/issue39349'
    versions = ['Python 3.9']

    @aeros
    Copy link
    Contributor Author

    aeros commented Jan 16, 2020

    This feature enhancement issue is based on the following python-ideas thread: https://mail.python.org/archives/list/python-ideas@python.org/thread/ZSUIFN5GTDO56H6LLOPXOLQK7EQQZJHJ/

    In summary, the main suggestion was to implement a new parameter called "cancel" (some bikeshedding over the name is welcome, I was thinking "cancel_futures" might be another option) for Executor.shutdown(), that would be added to both ThreadPoolExecutor and ProcessPoolExecutor. When set to True, this parameter would cancel all pending futures that were scheduled to the executor just after setting self._shutdown.

    In order to build some experience in working with the internals of the executors (particularly for implementing native pools in asyncio in the future), I plan on working on this issue; assuming Antoine and/or Brian are +1 on it. Guido seemed to approve of the idea.

    The implementation in ThreadPoolExecutor should be fairly straightforward, as it would use much of the same logic that's in the private method _initializer_failed() (

    def _initializer_failed(self):
    with self._shutdown_lock:
    self._broken = ('A thread initializer failed, the thread pool '
    'is not usable anymore')
    # Drain work queue and mark pending futures failed
    while True:
    try:
    work_item = self._work_queue.get_nowait()
    except queue.Empty:
    break
    if work_item is not None:
    work_item.future.set_exception(BrokenThreadPool(self._broken))
    ). Minus the setting of self._broken, and cancelling each of the work_items (pending futures) instead of setting the BrokenThreadPool exception.

    For ProcessPoolExecutor, I'll likely have to spend some more time looking into the implementation details of it to figure out how the cancellation will work. IIUC, it would involve adding some additional logic in _queue_management_worker(), the function which is used by the queue management thread to communicate between the main process and the worker processes spawned by ProcessPoolExecutor.

    Specifically, in the "if shutting_down()" block (

    if shutting_down():
    try:
    # Flag the executor as shutting down as early as possible if it
    # is not gc-ed yet.
    if executor is not None:
    executor._shutdown_thread = True
    # Since no new work items can be added, it is safe to shutdown
    # this thread if there are no pending work items.
    if not pending_work_items:
    shutdown_worker()
    return
    except Full:
    # This is not a problem: we will eventually be woken up (in
    # result_queue.get()) and be able to send a sentinel again.
    pass
    ), I think we could add an additional conditional check to see if self._cancel_pending_work_items is true (new internal flag set during executor.shutdown() if *cancel* is true, just after setting "self._shutdown_thread = True"). In this block, it would iterate through the pending work items, and cancel their associated future. Here's a rough example of what I have in mind:

            if shutting_down():
                try:
                    # Flag the executor as shutting down as early as possible if it
                    # is not gc-ed yet.
                    if executor is not None:
                        executor._shutdown_thread = True
    +                  if executor._cancel_pending_work_items:
    +                      # We only care about the values in the dict, which are
    +                      # the actual work items.
    +                      for work_item in pending_work_items.values():
    +                          work_item.future.cancel()
                    # Since no new work items can be added, it is safe to shutdown
                    # this thread if there are no pending work items.
                    if not pending_work_items:
                        shutdown_worker()
                        return
                except Full:
                    # This is not a problem: we will eventually be woken up (in
                    # result_queue.get()) and be able to send a sentinel again.
                    pass
    

    Would something along the lines of the above be a potentially viable method of implementing the *cancel* parameter for ProcessPoolExecutor.shutdown()? The main downside to this implementation is that it can't cancel anything that is already running (pushed from pending_work_items to call_queue). But from my understanding, there isn't a viable means of cancelling anything in the call queue; at that point it's too late.

    Anyways, I'll work on the ThreadPoolExecutor implementation in the meantime. As mentioned previously, that one should be more straightforward. After getting it working, I'll open a PR for just ThreadPoolExecutor, and then work on ProcessPoolExecutor in another PR after receiving some feedback on the above idea.

    @aeros aeros added the 3.9 only security fixes label Jan 16, 2020
    @aeros aeros self-assigned this Jan 16, 2020
    @aeros aeros added type-feature A feature request or enhancement 3.9 only security fixes labels Jan 16, 2020
    @aeros aeros self-assigned this Jan 16, 2020
    @aeros aeros added type-feature A feature request or enhancement stdlib Python modules in the Lib dir labels Jan 16, 2020
    @aeros
    Copy link
    Contributor Author

    aeros commented Jan 17, 2020

    As of now, I have the implementation for ThreadPoolExecutor working correctly, and a unit test added to ensure its behavior. It was a bit more involved than I originally anticipated, as I had to make a minor change in the _worker() function to allow the new parameter to be compatible with wait (which is important, as it prevents dangling threads).

    With my initial implementation, using "wait=True" and "cancel_futures=True" was resulting in any threads that called work_queue.get(block=True) to hang indefinitely. In order to remedy this, I changed it to use work_queue.get_nowait(). If a queue.Empty exception occurs, it checks for a global constant _cancelling_futures (set to True just in shutdown before it starts draining the work queue). If it's true, the while True loop is broken, otherwise it continues to the next iteration. This effectively has the same behavior as before.

    I experimented with a few different possible solutions, and the above was the only one that worked while still maintaining the current behavior as much as possible. From what I can tell, this was the only viable means of implementing the new parameter without making it entirely incompatible with "wait=True".

    At this point, I believe the only remaining step for the ThreadPoolExecutor implementation is to update the documentation and decide on the name. After working with it, I'm leaning more towards *cancel_futures*, as I think this more accurately describes what it does compared to just *cancel* (which is a bit vague IMO).

    @aeros aeros changed the title Add "cancel" parameter to concurrent.futures.Executor.shutdown() Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown() Jan 17, 2020
    @aeros aeros changed the title Add "cancel" parameter to concurrent.futures.Executor.shutdown() Add "cancel_futures" parameter to concurrent.futures.Executor.shutdown() Jan 17, 2020
    @aeros
    Copy link
    Contributor Author

    aeros commented Jan 17, 2020

    It was a bit more involved than I originally anticipated, as I had to make a minor change in the _worker() function to allow the new parameter to be compatible with wait (which is important, as it prevents dangling threads).

    Never mind, I just realized that I could simply add work_queue.put(None) at the end of the queue drain to unblock the indefinitely hanging thread. So, the global constant and change in logic for _worker() isn't needed.

    @aeros
    Copy link
    Contributor Author

    aeros commented Jan 19, 2020

    I now have a working implementation, for both ThreadPoolExecutor and ProcessPoolExecutor. I've also ensured that the tests I added are not vulnerable to race conditions with the following:

    [aeros:~/repos/aeros-cpython]$ ./python -m test test_concurrent_futures --match test_cancel_futures -j200 -v -F
    
    [snip]
    Ran 4 tests in 2.865s
    
    OK
    0:03:24 load avg: 143.25 [1018] test_concurrent_futures passed -- running: test_concurrent_futures (2 min 36 sec), test_concurrent_futures (35.8 sec)
    test_cancel_futures (test.test_concurrent_futures.ProcessPoolForkProcessPoolShutdownTest) ... 0.57s ok
    test_cancel_futures (test.test_concurrent_futures.ProcessPoolForkserverProcessPoolShutdownTest) ... 0.80s ok
    test_cancel_futures (test.test_concurrent_futures.ProcessPoolSpawnProcessPoolShutdownTest) ... 0.53s ok
    test_cancel_futures (test.test_concurrent_futures.ThreadPoolShutdownTest) ... 0.20s ok
    

    I'll attach a PR to the issue once I finish writing the documentation and "What's New" entry.

    Note: I was originally going to do this in two separate PRs, one for each executor, but it seemed to make more sense to just have it as a single cohesive PR since Executor.shutdown() shares the same documentation for both executors.

    @pitrou
    Copy link
    Member

    pitrou commented Feb 2, 2020

    New changeset 339fd46 by Kyle Stanley in branch 'master':
    bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057)
    339fd46

    @pitrou
    Copy link
    Member

    pitrou commented Feb 2, 2020

    Thank you very muck, Kyle!

    @pitrou pitrou closed this as completed Feb 2, 2020
    @pitrou pitrou closed this as completed Feb 2, 2020
    @aeros
    Copy link
    Contributor Author

    aeros commented Feb 3, 2020

    Thank you very muck, Kyle!

    No problem. Thanks for reviewing and merging the PR, Antoine!

    @hauntsaninja
    Copy link
    Contributor

    I was working on updating typeshed stubs to reflect this change. It looks like the parameter wasn't actually added to the base class (

    def shutdown(self, wait=True):
    ), even though it's documented as having the new parameter (https://docs.python.org/3.9/library/concurrent.futures.html#concurrent.futures.Executor.shutdown).

    Is this intentional? If not, I'd be happy to submit a PR adding the parameter to the base class.

    @aeros
    Copy link
    Contributor Author

    aeros commented Aug 31, 2020

    Good catch, Shantanu. It was not intentional on my part, and it would make sense to include *cancel_futures* in the base Executor class as documented. If you'd like to submit a PR to add it (attaching it to this issue), I should be able to review it within a reasonable period.

    @aeros
    Copy link
    Contributor Author

    aeros commented Sep 1, 2020

    New changeset 17dc1b7 by Shantanu in branch 'master':
    bpo-39349: Add cancel_futures to Executor.shutdown base class (GH-22023)
    17dc1b7

    @aeros
    Copy link
    Contributor Author

    aeros commented Sep 2, 2020

    New changeset a763ee3 by Shantanu in branch '3.9':
    [3.9] bpo-39349: Add cancel_futures to Executor.shutdown base class (GH-22023) (GH-22048)
    a763ee3

    @aeros
    Copy link
    Contributor Author

    aeros commented Sep 2, 2020

    Thanks for bringing attention to cancel_futures being missing in the base Executor class and for the PR, Shantanu!

    @ezio-melotti ezio-melotti transferred this issue from another repository Apr 10, 2022
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    3.9 only security fixes stdlib Python modules in the Lib dir type-feature A feature request or enhancement
    Projects
    None yet
    Development

    No branches or pull requests

    3 participants