This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: concurrent.future.ThreadPoolExecutor should parameterize class used for threads
Type: enhancement Stage:
Components: Library (Lib) Versions: Python 3.11
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: Sergei Maertens, asvetlov, bquinlan, erickpeirson, pitrou, serhiy.storchaka, yselivanov
Priority: normal Keywords:

Created on 2021-10-01 10:37 by erickpeirson, last changed 2022-04-11 14:59 by admin.

Pull Requests
URL Status Linked Edit
PR 28640 open erickpeirson, 2021-10-01 10:37
Messages (8)
msg403007 - (view) Author: Erick (erickpeirson) * Date: 2021-10-01 10:37
Currently the only way to use a class other than `threading.Thread` with `concurrent.futures.ThreadPoolExecutor` is to extend `ThreadPoolExecutor` and override the private method `_adjust_thread_count()`. 

For example, suppose I have a class that applies some custom logic when running code in a new thread:

```
class MyThread(threading.Thread):
    def run(self):
        with some_important_context():
            super().run()
```

Using this class with `ThreadPoolExecutor` requires me to write the following code:

```
class MyThreadPoolExecutor(ThreadPoolExecutor):
    def _adjust_thread_count(self):
        # if idle threads are available, don't spin new threads
        if self._idle_semaphore.acquire(timeout=0):
            return

        # When the executor gets lost, the weakref callback will wake up
        # the worker threads.
        def weakref_cb(_, q=self._work_queue):
            q.put(None)

        num_threads = len(self._threads)
        if num_threads < self._max_workers:
            thread_name = '%s_%d' % (self._thread_name_prefix or self,
                                     num_threads)
            t = MyThread(name=thread_name, target=_worker,
                         args=(weakref.ref(self, weakref_cb),
                               self._work_queue,
                               self._initializer,
                               self._initargs))
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue


with MyThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
```

That's a bummer, because now I have to maintain this method if there are upstream fixes/changes. I also can't count on it existing in the future, since it's a private method. 

In other words, `ThreadPoolExecutor` is not composable, and extending it to use a custom `Thread` class is neither safe nor maintainable.

Here's what I'd like to be able to do instead:

```
with ThreadPoolExecutor(max_workers=1, thread_class=MyThread) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
```
msg403054 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2021-10-02 13:50
Can you apply some custom logic by specifying the initializer?
msg403142 - (view) Author: Erick (erickpeirson) * Date: 2021-10-04 14:53
> Can you apply some custom logic by specifying the initializer?

Not sure that I follow; how would I use a context manager with the initializer?

Of course this is just one example. In the `threading` docs, the second sentence in the description of the `Thread.run` method (https://docs.python.org/3/library/threading.html#threading.Thread.run) is:

> You may override this method in a subclass. 

But one cannot use a subclass of `Thread` with `ThreadPoolExecutor` without the gymnastics indicated earlier. That's the real issue here, I think.
msg403145 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2021-10-04 16:06
You can call __enter__ in the initializer. Seems there is no easy way to call __exit__ at shutting down the thread, perhaps something like per-thread atexit could help. As an example, you can save the context manager in thread locals and manually repeatedly submit a function that calls __exit__ and blocks the thread on some Barrier before calling ThreadPoolExecutor.shutdown(). It is complicated, so we may add some helpers to support context managers.

What are examples of your some_important_context()? Is it important to call some code before destroying the thread?

The problem with allowing the user to specify the Thread subclass is that in general using Thread is an implementation detail. ThreadPoolExecutor could be implemented using the low-level _thread module instead. Or in future it can need to create a special Thread subclass, and user-specified Thread subclass can be not compatible with it. It is safer to limit ways in which the user can affect execution. The initializer parameter was added to address cases similar to your.

Note also that ThreadPoolExecutor and ProcessPoolExecutor have almost identical interface. If we add some feature in ThreadPoolExecutor we will have a pressure to add the same feature in ProcessPoolExecutor to solve similar problems.
msg403186 - (view) Author: Erick (erickpeirson) * Date: 2021-10-04 23:38
> What are examples of your some_important_context()? Is it important to call some code before destroying the thread?

To be honest, pulled a context manager example out of thin air to illustrate the point. But sure, I want to allocate a resource before the thread runs, and release it when the work is done.

> The problem with allowing the user to specify the Thread subclass is that in general using Thread is an implementation detail. ThreadPoolExecutor could be implemented using the low-level _thread module instead. Or in future it can need to create a special Thread subclass, and user-specified Thread subclass can be not compatible with it. 

This is surprising to hear, since I imagine that there are many potential users of this library that are evolving from direct wrangling of Thread objects, where custom Thread subclasses are commonplace. This was certainly the scenario that prompted me to post.

Ultimately it's up to the maintainers what direction the library will go. Are there specific plans to adopt an alternative implementation that is orthogonal to `threading.Thread`? Or are there reasons to think that it is likely? I would submit that maintaining smooth interoperability between this library and the `threading` library would be a welcome constraint, absent specific drivers to the contrary.
msg403187 - (view) Author: Erick (erickpeirson) * Date: 2021-10-04 23:41
It also occurs to me that even if `concurrent.futures` adopted an alternative Thread class, it would still be preferable to allow for composition in the manner that was originally proposed.
msg408562 - (view) Author: Sergei Maertens (Sergei Maertens) Date: 2021-12-14 20:58
I was looking for some way to be able to add a thread finalizer, a piece of code to be called when the thread pool shuts down and all threads need cleaning up. Glad I came across this issue, since the example of using a Thread subclass with a custom run (wrapped in a context manager) would fill my needs completely.

> What are examples of your some_important_context()? Is it important to call some code before destroying the thread?

I currently have a use case with a web-framework that has persistent DB connections - i.e. they span multiple HTTP requests. This also applies in the context of running a command-line script with said framework where database connections are opened.

We are calling external APIs (IO heavy), so using a ThreadPoolExecutor makes sense. However, since those "DB connection pools" are thread locals, we need to ensure that database connections are closed again to not leak resources.

The current workaround is to submit a job/function to the pool and have it close the database connections, which adds overhead since database connections are now opened and closed within the same thread that could have been re-used.

Using a context manager, we would be able to wrap the `super().run(...)` and close the database connections when the thread exits (succesfully or because of an Exception, even). This comes very close to having an `atexit` for individual threads.

Furthermore I like the idea of being able to provide the class as a context manager kwarg, but would also not be opposed to a class property specifying the Thread class to use - both would greatly enhance composability and are a cleaner solution than adding a finalizer option (similar to the `initializer` kwarg)
msg412238 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2022-02-01 09:10
I guess if you are asking for initialization and finalization of thread-specific data in a thread pool -- you need exactly these things (or a context manager).
A custom thread class reveals too many implementation details.
I personally prefer an explicit initializer/finalizer based approach.
History
Date User Action Args
2022-04-11 14:59:50adminsetgithub: 89502
2022-02-01 09:10:18asvetlovsetmessages: + msg412238
2021-12-14 20:58:48Sergei Maertenssetnosy: + Sergei Maertens
messages: + msg408562
2021-10-04 23:41:58erickpeirsonsetmessages: + msg403187
2021-10-04 23:38:58erickpeirsonsetmessages: + msg403186
2021-10-04 16:06:28serhiy.storchakasetmessages: + msg403145
2021-10-04 14:53:10erickpeirsonsetmessages: + msg403142
2021-10-02 13:54:08serhiy.storchakasetnosy: + bquinlan, pitrou
components: - asyncio
2021-10-02 13:50:53serhiy.storchakasetnosy: + asvetlov, serhiy.storchaka, yselivanov
messages: + msg403054

components: + asyncio
type: enhancement
2021-10-01 10:37:53erickpeirsoncreate