This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Make threading._register_atexit public?
Type: Stage:
Components: Library (Lib) Versions: Python 3.9
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: Ben.Darnell, aeros, eric.snow, pitrou, sa, vstinner
Priority: normal Keywords:

Created on 2020-10-07 01:09 by Ben.Darnell, last changed 2022-04-11 14:59 by admin.

Messages (10)
msg378144 - (view) Author: Ben Darnell (Ben.Darnell) * Date: 2020-10-07 01:09
I'm dealing with a subtle deadlock involving concurrent.futures.ThreadPoolExecutor, and my solution that worked in Python 3.8 broke with 3.9. I'm running some long-running (possibly infinite) tasks in the thread pool, and I cancel them in an `atexit` callback so that everything can shut down cleanly (before ThreadPoolExecutor joins all worker threads in its own `atexit` hook).

Python 3.9 broke this due to https://bugs.python.org/issue39812. That change introduced a new atexit-like mechanism to the threading module and uses it where Python 3.8 used regular atexit. This means that ThreadPoolExecutor's atexit runs before mine, and since I never get a chance to cancel my tasks, it deadlocks.

One way I can solve this is to move my own atexit function to `threading._register_atexit`, so my strawman proposal here is to make that function public and documented. 

On the other hand, even without the change in Python 3.9, my use of `atexit` smells like an abuse of implementation details in ThreadPoolExecutor (getting the atexit callbacks called in the right order was tricky when the concurrent.futures module started using lazy loading in Python 3.7). So I would welcome other suggestions about how to handle long-running but cancelable operations in a ThreadPoolExecutor at shutdown. 

One clean solution is to do the cancellation at the end of the main module instead of in an atexit hook. However, I'm doing this at a library so I don't have any way but atexit to ensure that this happens. Another option is to forego ThreadPoolExecutor entirely and manage the threads myself. 

My code in question is in a not-yet-released branch of Tornado: https://github.com/tornadoweb/tornado/blob/5913aa43ecfdaa76876fc57867062227b907b1dd/tornado/platform/asyncio.py#L57-L73

With the master branch of Tornado, Python 3.9, and Windows, `python -c "from tornado.httpclient import HTTPClient; c = HTTPClient()` reliably deadlocks at interpreter shutdown.
msg385589 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2021-01-24 19:30
> I'm dealing with a subtle deadlock involving concurrent.futures.ThreadPoolExecutor, and my solution that worked in Python 3.8 broke with 3.9. I'm running some long-running (possibly infinite) tasks in the thread pool, and I cancel them in an `atexit` callback so that everything can shut down cleanly (before ThreadPoolExecutor joins all worker threads in its own `atexit` hook).

IMO, a better practice would be providing those potentially infinite running tasks a direct method of escape and invoking it before calling executor.shutdown(), it would be a more reliable approach. But, perhaps there is some convenience utility in being able to provide custom atexit hooks. It also might help the user to separate the shutdown logic from the rest of the program. 

Since you worked with me in adding threading._register_atexit(), Do you have any thoughts, Antoine? I would personally not be opposed to it being made public assuming there's real utility present in doing so.

My only concern is that it might be a potential foot-gun. If the user submits an atexit hook that deadlocks, it might prevent threads from shutting down safely prior to interpreter finalization. I'm presently undecided if explicitly mentioning that it in the docs would be sufficient warning.
msg385598 - (view) Author: Ben Darnell (Ben.Darnell) * Date: 2021-01-25 03:09
> IMO, a better practice would be providing those potentially infinite running tasks a direct method of escape and invoking it before calling executor.shutdown(), it would be a more reliable approach.

Agreed, but the problem is that I'm in a library (so I don't control the main module), and the library's interface does not mandate any sort of explicit shutdown method. There is a shutdown method, but almost no one calls it, and it's never caused a problem until Python 3.9 changed things so it deadlocks. 

> My only concern is that it might be a potential foot-gun. If the user submits an atexit hook that deadlocks, it might prevent threads from shutting down safely prior to interpreter finalization. 

Yes, and that is exactly the problem. concurrent.futures submits an atexit hook whose behavior depends on application code, and through that I have inadvertently caused a deadlock.
msg385688 - (view) Author: Ben Darnell (Ben.Darnell) * Date: 2021-01-26 01:50
I have resolved my issue here by moving from ThreadPoolExecutor to a plain threading.Thread that I manage by hand (https://github.com/tornadoweb/tornado/commit/15832bc423c33c9280564770046dd6918f3a31b4). Therefore I no longer need this for myself and I leave it up to you to decide whether there's anything worth doing at this point.
msg412438 - (view) Author: Simon Arlott (sa) Date: 2022-02-03 11:44
Another way to do this is to call threading.main_thread().join() in another thread and do the shutdown cleanup when it returns.

The main thread is stopped at shutdown just before the threading._threading_atexits are called.
msg412470 - (view) Author: Eric Snow (eric.snow) * (Python committer) Date: 2022-02-03 21:19
> I'm running some long-running (possibly infinite) tasks in the thread pool,
> and I cancel them in an `atexit` callback

To be clear, by "cancel" you are not talking about Future.cancel().  Rather, your handler causes all running tasks to finish (by sending a special message on the socket corresponding to each running task).  Is that right?

Some other things I inferred from your atexit handler:

* it does not make sure the task associated with the socket finishes (no way of knowing?)
* so if a task hangs while trying to stop then the running thread in the ThreadPoolExecutor would block shutdown forever
* similarly, if a task is stuck handling a request then it will never receive the special message on the socket, either blocking the send() in your handler or causing ThreadPoolExecutor shutdown/atexit to wait forever
* it vaguely implies a 1-to-1 relationship between sockets and *running* tasks
* likewise that pending (queued) tasks do not have an associated socket (until started)
* so once your handler finishes, any tasks pending in the ThreadPoolExecutor queue will eventually get started but never get stopped by your handler; thus you're back to the deadlock situation

Does all that sound right?  Most of that is relevant to some possible solutions I have in mind.
msg412471 - (view) Author: Eric Snow (eric.snow) * (Python committer) Date: 2022-02-03 21:21
> I'm running some long-running (possibly infinite) tasks in the thread pool,
> and I cancel them in an `atexit` callback

Alternately, perhaps ThreadPoolExecutor isn't the right fit here, as implied by the route you ended up going.  It seems like it's not well-suited for long-running (or infinite) tasks.  In that case, perhaps the concurrent.futures docs could be more clear about when ThreadPoolExecutor is not a good fit and what the alternatives are.
msg412472 - (view) Author: Eric Snow (eric.snow) * (Python committer) Date: 2022-02-03 21:27
FWIW, here's a brain dump about ThreadPoolExecutor and its atexit handler after having looked at the code.

----

First, the relationship between the objects involved:

* work item -> Future
* work item -> task (e.g. function)
* queue -> [work item]
* worker -> executor
* worker -> queue
* worker -> currently running work item
* thread -> worker
* ThreadPoolExecutor -> [thread]
* ThreadPoolExecutor -> queue
* global state -> {thread: queue}

Observations:

* a work item's future and task are not aware of each other, and operations on either have no effect on the other

----

Next, let's look at the relevant ways the objects can be used:

* publicly
   * ThreadPoolExecutor.submit(task) -> Future
   * ThreadPoolExecutor.shutdown()
   * Future.result() and Future.exception()
   * Future.cancel()
   * Future.add_done_callback()
* internally
   * queue.pop() -> work item
   * <work item>.run()
   * thread.join()
   * Future.set_running_or_notify_cancel()
   * Future.set_result() and Future.set_exception()

Observations:

* nothing interacts with a worker directly; it is waited on via its thread and it receives work (or None) via the queue it was given
* once a worker pops the next work item off the queue, nothing else has access to that work item (the original ThreadPoolExecutor().submit() caller has the Future, but that's it)
* there is no cancelation mechanism for tasks
* there is no simple way to interact with the queued tasks
* realistically, there is no way to interact with the currently running task
* there is no reliable way to "kill" a thread

----

Regarding how tasks run:

* ThreadPoolExecutor.submit(task) -> Future
* ThreadPoolExecutor.submit(task) -> work item (Future, task) -> queue
* ThreadPoolExecutor.submit(task) -> thread (worker)
* thread -> worker -> ( queue -> work item -> task )

Observations::

* the worker loop exits if the next item in the queue is None (and the executor is shutting down)

----

Now lets look more closely at the atexit handler.

* as you noted, since 3.9 it is registered with threading._register_atexit() instead of atexit.register()
* the threading atexit handlers run before the regular atexit handlers
* the ThreadPoolExecutor handler does not actually interact with ThreadPoolExecutor instances directly
* it only deals with a module-global list of (thread, [work item]) pairs, to which ThreadPoolExecutor instances add items as they go

The handler does the following:

1. disables ThreadPoolExecutor.submit() (for all instances)
2. (indirectly) tells each worker to stop ASAP
3. lets every pending task run (and lets every running task keep running)
4. waits until all tasks have finished

It does not:

* call any ThreadPoolExecutor.shutdown()
* otherwise deal with the ThreadPoolExecutor instances directly
* call Future.cancel() for any of the tasks
* use any timeout in step 4, so it may block forever
* notify tasks that they should finish
* deal well with any long-running (or infinite) task

ThreadPoolExecutor.shutdown() basically does the same thing.  However, it only does the steps above for its own tasks.  It also optionally calls Future.cancel() for each queued task (right before step 2).  However, all that does is keep any queued-but-not-running tasks from starting.  Also, you can optionally skips step 4.
msg412475 - (view) Author: Eric Snow (eric.snow) * (Python committer) Date: 2022-02-03 21:55
> This means that ThreadPoolExecutor's atexit runs before mine,
> and since I never get a chance to cancel my tasks, it deadlocks.

(assuming we want to support long-running tasks here)

With all the above in mind, there are a few things that may help.

The first that comes to mind is to have the atexit handler call ThreadPoolExecutor.shutdown() for each instance.

So something like this:


def _python_exit():
    global _shutdown
    with _global_shutdown_lock:
        _shutdown = True
    for executor in list(_executors):
        executor.shutdown()


That would require a little refactoring to make it work.  However, the change is simpler if each executor has its own atexit handler:


class ThreadPoolExecutor(_base.Executor):

    def __init__(self, ...):
        ...
        threading._register_atexit(self._atexit())

    def _atexit(self):
        global _shutdown
        _shutdown = True
        self.shutdown()


The value of either approach is that you can then subclass ThreadPoolExecutor to get what you want:


class MyExecutor(ThreadPoolExecutor):
    def shutdown(self, *args, **kwargs):
        stop_my_tasks()
        super().shutdown(*args, **kwwargs)


----

One thing I thought about was supporting a per-task finalizer instead, since that aligns more closely with the way ThreadPoolExecutor works.  It would only apply  So something like one of the following:

* ThreadPoolExecutor(finalize_task=<callable>)
* ThreadPoolExecutor.submit(finalize=<callable)

----

Other things that could be helpful:

* always cancel all the pending tasks during shutdown (and maybe let users opt out)
* use a timeout during shutdown

----

FWIW, adding support for some sort of sub-atexit handler isn't so appealing.  I'm talking about something like one of the following:

* ThreadPoolExecutor(onshutdown=<callable>)
* ThreadPoolExecutor.register_atexit(<callable>)
* (classmethod) ThreadPoolExecutor.register_atexit(<callable>)
* concurrent.futures.register_atexit(<callable>)

(It would probably make sense to pass the list of currently running tasks to the callable.)
msg412526 - (view) Author: Ben Darnell (Ben.Darnell) * Date: 2022-02-04 19:07
> To be clear, by "cancel" you are not talking about Future.cancel().  Rather, your handler causes all running tasks to finish (by sending a special message on the socket corresponding to each running task).  Is that right?

Correct. My tasks here are calls to functions from the `select` module (one select call per executor task), and cancelling them means writing a byte to a pipe set up for this purpose. 

The select calls could be given a timeout so there is never an infinite task, but that's not ideal - a timeout that's too low has a performance cost as calls timeout and restart even when the system is "at rest", and a too-long timeout is still going to be perceived as a hanging application. 

> * it does not make sure the task associated with the socket finishes (no way of knowing?)
> * so if a task hangs while trying to stop then the running thread in the ThreadPoolExecutor would block shutdown forever
> * similarly, if a task is stuck handling a request then it will never receive the special message on the socket, either blocking the send() in your handler or causing ThreadPoolExecutor shutdown/atexit to wait forever

Correct. If the task were buggy it could still cause a deadlock. In my case the task is simple enough (a single selector call) that this is not a risk. 

> * it vaguely implies a 1-to-1 relationship between sockets and *running* tasks
> * likewise that pending (queued) tasks do not have an associated socket (until started)

Each task is associated with a selector object (managing a set of sockets), not a single socket. There is only ever one task at a time; a task is enqueued only after the previous one finishes. (This thread pool is not used for any other purpose)

> * so once your handler finishes, any tasks pending in the ThreadPoolExecutor queue will eventually get started but never get stopped by your handler; thus you're back to the deadlock situation

In my case this one-at-a-time rule means that the queue is always empty. But yes, in a more general solution you'd need some sort of interlock between cancelling existing tasks and starting new ones. 

> Alternately, perhaps ThreadPoolExecutor isn't the right fit here, as implied by the route you ended up going. 

Yes, this is my conclusion as well. I filed this issue because I was frustrated that Python 3.9 broke previously-working code, but I'm willing to chalk this up to Hyrum's law and I'm not sure that this is something that ThreadPoolExecutor should be modified to support.
History
Date User Action Args
2022-04-11 14:59:36adminsetgithub: 86128
2022-02-04 19:07:30Ben.Darnellsetmessages: + msg412526
2022-02-03 21:55:07eric.snowsetmessages: + msg412475
2022-02-03 21:27:24eric.snowsetmessages: + msg412472
2022-02-03 21:21:56eric.snowsetmessages: + msg412471
2022-02-03 21:19:02eric.snowsetnosy: + eric.snow
messages: + msg412470
2022-02-03 11:44:05sasetnosy: + sa
messages: + msg412438
2021-01-26 01:50:38Ben.Darnellsetmessages: + msg385688
2021-01-25 03:09:07Ben.Darnellsetmessages: + msg385598
2021-01-24 19:30:22aerossetnosy: + pitrou
messages: + msg385589
2021-01-24 16:33:48iritkatrielsetnosy: + vstinner, aeros
components: + Library (Lib)
2020-10-07 01:09:44Ben.Darnellcreate