classification
Title: concurrent.futures.ThreadPoolExecutor/ProcessPoolExecutor should accept an initializer argument
Type: enhancement Stage: resolved
Components: Library (Lib) Versions: Python 3.7
process
Status: closed Resolution: fixed
Dependencies: Superseder:
Assigned To: Nosy List: Claudiu.Popa, Shiming He, andreasvc, bquinlan, dan.oreilly, giampaolo.rodola, jcon, josh.r, mark.dickinson, mdengler, nchammas, pitrou, s0undt3ch, serhiy.storchaka, thehesiod, vinay.sajip
Priority: normal Keywords: patch

Created on 2014-05-03 23:38 by andreasvc, last changed 2017-11-09 14:59 by serhiy.storchaka. This issue is now closed.

Files
File name Uploaded Description Edit
pool_initializer.patch andreasvc, 2014-05-07 23:15 add initializer and initargs keywords to both ThreadPoolExecutor and ProcessPoolExecutor review
pool_initializer_tests.patch andreasvc, 2014-05-08 00:13 add tests review
pool_init.patch dan.oreilly, 2014-07-23 15:34 Update patch to apply against latest, tweak exception handling review
init_patch_updated.patch dan.oreilly, 2014-08-10 18:57 Supress tracebacks in negative tests, fix race condition and a typo in ThreadPoolExecutor review
init_patch_updated2.patch dan.oreilly, 2014-08-11 05:45 Make ProcessPool/ThreadPool behavior when the initializer fails consistent. review
Pull Requests
URL Status Linked Edit
PR 4241 merged pitrou, 2017-11-02 21:26
PR 4347 merged pitrou, 2017-11-09 12:35
Messages (30)
msg217846 - (view) Author: Andreas van Cranenburgh (andreasvc) * Date: 2014-05-03 23:38
It would be useful if concurrent.futures.ThreadPoolExecutor took an initializer argument, like multiprocessing.Pool.

This is useful for example to load a large dataset once upon initialization of each worker process, without have to pass the dataset as an argument with every job submission, which requires serialization.

concurrent.futures has some advantages over multiprocessing such as detecting killed processes ( http://bugs.python.org/issue9205 ), so it would be good if the advantages of both can be combined.

It appears that the following fork of concurrent.futures has added these arguments: https://github.com/enthought/encore/blob/7101984bc384da8e7975876ca2809cc0103c3efc/encore/concurrent/futures/enhanced_thread_pool_executor.py
msg218019 - (view) Author: Josh Rosenberg (josh.r) * Date: 2014-05-06 21:07
Do you mean ProcessPoolExecutor? Thread based pools don't involve serialization.

It would be good for ThreadPoolExecutor to have it as well, to make it easier to switch between executors, but only ProcessPoolExecutor is suffering from serialization overhead. Threads share the same memory space after all; references to data get passed directly, though you might choose to copy.copy or copy.deepcopy a root data "template" so each thread has its own unique copy that it can mutate.
msg218020 - (view) Author: Andreas van Cranenburgh (andreasvc) * Date: 2014-05-06 21:17
Yes I did mean ProcessPoolExecutor, but indeed, it's good to have for threads as well.

I could try to make a patch if it is likely that it would be accepted.
msg218024 - (view) Author: Josh Rosenberg (josh.r) * Date: 2014-05-06 21:45
I'm not a core developer, but writing the patch is usually considered helpful. Two notes:

1. Make sure to write unit tests for any new behavior
2. I'd suggest making any such argument keyword-only; if we move closer to the Java executor model, that means having a lot of options, the majority of which would be left as the default by users. Binding the API to a particular argument order is sub-optimal (it makes it even harder to deprecate arguments for instance), so enforcing keyword only behavior ensures users can't write call lines that take dependencies on argument ordering.
msg218040 - (view) Author: Mark Dickinson (mark.dickinson) * (Python committer) Date: 2014-05-07 07:55
BTW, I think there's a design mistake in the EnhancedThreadPoolExecutor that's worth avoiding in any std. lib. implementation: the initialiser and uninitialiser for the EnhancedThreadPoolExecutor accept no arguments. In retrospect, it would have been better to have them take the thread itself as a single argument.  We often found ourselves needing this - it's not hard to work around with a threading.current_thread() call, but it's mildly annoying to have to do so.
msg218090 - (view) Author: Andreas van Cranenburgh (andreasvc) * Date: 2014-05-07 23:15
Here's a patch. I have added initializer and initargs keywords to both ThreadPoolExecutor and ProcessPoolExecutor, with the same semantics as multiprocessing.Pool.

I couldn't figure out what to do if the initializer fails with a ProcessPoolExecutor: how to properly send the traceback back? I also haven't gotten around to figure out how to write tests.
I haven't added unitializers, don't know if they would be useful.
msg218093 - (view) Author: Andreas van Cranenburgh (andreasvc) * Date: 2014-05-08 00:13
Here's a version with tests. Detecting an execption in the initializer works with ProcessPoolExecutor, but not with ThreadPoolExecutor.
msg218462 - (view) Author: Giampaolo Rodola' (giampaolo.rodola) * (Python committer) Date: 2014-05-13 14:13
Related:
https://groups.google.com/forum/#!topic/dev-python/ytbYwHXKC6o
I'm not sure how what is proposed here would be useful for ThreadPoolExecutor but it would definitely be helpful being able to set an initializer for ProcessPoolExecutor because right now it seems it's impossible to cleanly shutdown the executor, see:
http://noswap.com/blog/python-multiprocessing-keyboardinterrupt
msg218518 - (view) Author: Andreas van Cranenburgh (andreasvc) * Date: 2014-05-14 10:59
Giampaolo, this patch is for ProcessPoolExecutor as well.

About keyboard interrupts, if my tests are correct, they work
in Python 3.3+ with both multiprocessing and concurrent.futures.
(Although for the latter I have to hit ctrl-c twice).
msg223741 - (view) Author: Dan O'Reilly (dan.oreilly) * Date: 2014-07-23 15:34
It seems like everyone agrees that this functionality is useful, so I'm reviving this in hopes of getting a patch pushed through. I've updated Andreas' patch so that it applies cleanly against the latest tree, and tweaked the handling of exceptions in initializer. Now, ProcessPoolExecutor will raise a BrokenPoolException should an initializer method fail, and ThreadPoolExecutor will raise a RunTimeError stating that the pool can't be used because an initializer failed.

I was hoping to use multiprocessing.Pool's handling of initializer exceptions as a guide for the right behavior here, but it actually does terrible job: an exception raised in the initializer is completely unhandled, and results in an endless loop of new processes being started up and immediately failing. But that's a separate bug report. :)

For now there are still unit tests for testing exceptions being raised in the initializer, but they're noisy; the traceback for each initializer exception gets printed to stdout. I'm not sure if that's undesirable behavior or not.

If the new behavior looks ok, the docs will need an update to.
msg225159 - (view) Author: Dan O'Reilly (dan.oreilly) * Date: 2014-08-10 18:57
Here's an updated patch. Changes:

* Fixed what appears to have been a find/replace typo I made prior to uploading the previous patch.

* The tracebacks from the negative unit tests are now suppressed. 

* Fixed a race condition in the initializer failure handling for ThreadPoolExecutor. Futures that were submitted before the initializer actually failed will now raise a RuntimeError indicating that initializer failed.

* Suppressed an occasional queue.Full exception that would pop up while shutting down a ProcessPoolExecutor that was broken due to an initializer fail. As best as I can tell the exception is harmless, so suppressing it should be ok.

*Updated the docs.
msg225174 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2014-08-10 23:48
It seems the addition of the initargs argument doesn't tackle Mark's objection here:

> the initialiser and uninitialiser for the EnhancedThreadPoolExecutor
> accept no arguments. In retrospect, it would have been better to have 
> them take the thread itself as a single argument.

Regardless, I'm going to review the patch soon.
msg225176 - (view) Author: Dan O'Reilly (dan.oreilly) * Date: 2014-08-11 02:51
> It seems the addition of the initargs argument doesn't tackle Mark's objection here:

> > the initialiser and uninitialiser for the EnhancedThreadPoolExecutor
> > accept no arguments. In retrospect, it would have been better to have 
> > them take the thread itself as a single argument.

This would be inconsistent with multiprocessing.Pool's initializer/initargs behavior. I'm not sure if consistency is important there or not, but its worth pointing out.

I'm also not sure how useful it would be for ProcessPoolExecutor to receive an instance of itself. With multiprocessing.Pool,  initializer is a commonly used to pass objects that can only be inherited to workers (Queues, Locks, etc.). The same pattern would be useful for ProcessPoolExecutor, which means initializer needs to be able to take an arbitrary number of arguments, rather than just an instance to the Process object itself.
msg225177 - (view) Author: Dan O'Reilly (dan.oreilly) * Date: 2014-08-11 05:45
Another updated patch. This one changes ProcessPoolExecutor behavior so that RuntimeErrors are raised in any active Futures, and on subsequent calls to submit after the initializer fails. This makes its behavior consistent with ThreadPoolExecutor.
msg239166 - (view) Author: Mark Dickinson (mark.dickinson) * (Python committer) Date: 2015-03-24 18:33
> I'm also not sure how useful it would be for ProcessPoolExecutor to receive an instance of itself.

Agreed that this doesn't make much sense; I hadn't really thought about the ProcessPoolExecutor case.  I withdraw my comments!
msg261928 - (view) Author: Alexander Mohr (thehesiod) * Date: 2016-03-17 18:43
any chance if this getting into 3.5.2? I have some gross code to get around it (setting global properties)
msg279755 - (view) Author: Pedro Algarvio (s0undt3ch) Date: 2016-10-31 00:59
Would also love to see this in the stdlib soon. My use case is logging setup(forward logs to the main process).
msg299285 - (view) Author: Antony Lee (Antony.Lee) * Date: 2017-07-27 01:41
For cross-referencing purposes: I have proposed in http://bugs.python.org/issue25293 to allow passing a Thread/Process subclass as argument instead of an initializer function, which would both handle Mark Dickinson's comment (http://bugs.python.org/issue21423#msg218040) about passing the thread object as argument, and also allow for finalization.
msg305451 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-11-02 21:36
I've opened a PR for this at https://github.com/python/cpython/pull/4241, loosely based on Dan's last patch.
msg305463 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-11-03 09:02
By the way, the submitted PR follows Dan's argument about the initializer's argument: the actual call is `initializer(*initargs)`. If someone wants to know about the current thread or process, it's trivial to call `thread.current_thread()` or `multiprocessing.current_process()` (and I don't it's a bad idiom in itself :-)).

If you want to comment on the PR, I would recommend doing it quick, as I plan to merge in a day or two :-)
msg305550 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-11-04 10:05
New changeset 63ff4131af86e8a48cbedb9fbba95bd65ca90061 by Antoine Pitrou in branch 'master':
bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor (#4241)
https://github.com/python/cpython/commit/63ff4131af86e8a48cbedb9fbba95bd65ca90061
msg305601 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2017-11-05 17:13
test_concurrent_futures now produces too much output on stderr.

$ ./python -m test test_concurrent_futures >/dev/null
Exception in initializer:
Traceback (most recent call last):
  File "/home/serhiy/py/cpython/Lib/concurrent/futures/process.py", line 170, in _process_worker
    initializer(*initargs)
  File "/home/serhiy/py/cpython/Lib/test/test_concurrent_futures.py", line 66, in init_fail
    raise ValueError('error in initializer')
ValueError: error in initializer
Exception in initializer:
Traceback (most recent call last):
  File "/home/serhiy/py/cpython/Lib/concurrent/futures/process.py", line 170, in _process_worker
    initializer(*initargs)
  File "/home/serhiy/py/cpython/Lib/test/test_concurrent_futures.py", line 66, in init_fail
    raise ValueError('error in initializer')
ValueError: error in initializer
...

What is worse, this output looks as errors report.
msg305602 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2017-11-05 17:14
And please don't forget to edit a commit message when merge a PR.
msg305605 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-11-05 17:45
Unfortunately, there is no obvious way to capture the output of the child process here, short of running the entire test under a subprocess.
msg305873 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2017-11-08 16:03
What is the best way to silence logging in subprocesses?
msg305954 - (view) Author: Vinay Sajip (vinay.sajip) * (Python committer) Date: 2017-11-09 10:25
> What is the best way to silence logging in subprocesses?

Are you referring to the output shown in msg305601? If it's caused by logging statements, the best way would be either to pipe stderr to /dev/null or to change the logging to use sys.stdout (sys.stderr is just the default) and then pipe stdout to /dev/null.

I hope I haven't misunderstood your question, but I fear I may have.
msg305956 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2017-11-09 10:56
I'm not well experienced with logging, but if we can change the logging in subprocesses, I think we could change it to not output messages at all. It would be better to save logging messages in subprocesses and check that expected logging messages are emitted in the main process. There is assertLogs(), but it works only with logging in the same process.
msg305959 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-11-09 12:35
Serhiy, I think I have found a way to deal with the logging output:
https://github.com/python/cpython/pull/4347
msg305962 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-11-09 14:33
New changeset 0a2ff23fe6efb1653d655ac19d0a4e1629fd8d95 by Antoine Pitrou in branch 'master':
Silence error output in test_concurrent_futures (bpo-21423) (#4347)
https://github.com/python/cpython/commit/0a2ff23fe6efb1653d655ac19d0a4e1629fd8d95
msg305963 - (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2017-11-09 14:59
Thank you Antoine!
History
Date User Action Args
2017-11-09 14:59:25serhiy.storchakasetmessages: + msg305963
2017-11-09 14:51:39pitrousetstatus: open -> closed
resolution: fixed
stage: resolved
2017-11-09 14:33:45pitrousetmessages: + msg305962
2017-11-09 12:35:46pitrousetmessages: + msg305959
stage: patch review -> (no value)
2017-11-09 12:35:20pitrousetstage: patch review
pull_requests: + pull_request4304
2017-11-09 10:56:36serhiy.storchakasetmessages: + msg305956
2017-11-09 10:25:13vinay.sajipsetmessages: + msg305954
2017-11-08 16:03:33serhiy.storchakasetnosy: + vinay.sajip
messages: + msg305873
2017-11-05 17:45:57pitrousetmessages: + msg305605
2017-11-05 17:14:45serhiy.storchakasetmessages: + msg305602
2017-11-05 17:13:23serhiy.storchakasetstatus: closed -> open

nosy: + serhiy.storchaka
messages: + msg305601

resolution: fixed -> (no value)
stage: resolved -> (no value)
2017-11-04 10:06:08pitrousetstatus: open -> closed
resolution: fixed
stage: patch review -> resolved
2017-11-04 10:05:54pitrousetmessages: + msg305550
2017-11-03 09:02:51pitrousetmessages: + msg305463
2017-11-02 22:55:23pitroulinkissue25293 superseder
2017-11-02 21:36:02pitrousetmessages: + msg305451
2017-11-02 21:28:23Antony.Leesetnosy: - Antony.Lee
2017-11-02 21:26:43pitrousetpull_requests: + pull_request4206
2017-11-02 13:09:07pitrousetversions: + Python 3.7, - Python 3.5
2017-09-02 18:09:21pitroulinkissue31144 superseder
2017-07-27 01:41:32Antony.Leesetnosy: + Antony.Lee
messages: + msg299285
2016-10-31 00:59:26s0undt3chsetmessages: + msg279755
2016-10-31 00:52:12s0undt3chsetnosy: + s0undt3ch
2016-03-22 21:59:00John O'Connorsetnosy: + jcon
2016-03-17 18:43:05thehesiodsetnosy: + thehesiod
messages: + msg261928
2015-08-21 02:07:49Shiming Hesetnosy: + Shiming He
2015-04-30 10:19:15Claudiu.Popasetstage: patch review
2015-04-30 10:18:56Claudiu.Popasetnosy: + Claudiu.Popa
2015-03-24 18:33:57mark.dickinsonsetmessages: + msg239166
2015-03-24 15:53:54dan.oreillysettype: enhancement
2015-03-20 23:01:32nchammassetnosy: + nchammas
2014-08-11 05:45:35dan.oreillysetfiles: + init_patch_updated2.patch

messages: + msg225177
2014-08-11 02:51:43dan.oreillysetmessages: + msg225176
2014-08-10 23:48:45pitrousetmessages: + msg225174
2014-08-10 18:57:29dan.oreillysetfiles: + init_patch_updated.patch
nosy: + pitrou
messages: + msg225159

2014-08-10 17:29:22dan.oreillysettitle: concurrent.futures.ThreadPoolExecutor should accept an initializer argument -> concurrent.futures.ThreadPoolExecutor/ProcessPoolExecutor should accept an initializer argument
2014-07-23 15:35:01dan.oreillysetfiles: + pool_init.patch
nosy: + dan.oreilly
messages: + msg223741

2014-05-14 10:59:14andreasvcsetmessages: + msg218518
2014-05-13 14:13:12giampaolo.rodolasetnosy: + giampaolo.rodola
messages: + msg218462
2014-05-12 09:32:53mdenglersetnosy: + mdengler
2014-05-08 00:13:40andreasvcsetfiles: + pool_initializer_tests.patch

messages: + msg218093
2014-05-07 23:15:38andreasvcsetfiles: + pool_initializer.patch
keywords: + patch
messages: + msg218090
2014-05-07 07:55:59mark.dickinsonsetnosy: + mark.dickinson
messages: + msg218040
2014-05-06 21:45:03josh.rsetmessages: + msg218024
2014-05-06 21:17:01andreasvcsetmessages: + msg218020
2014-05-06 21:07:20josh.rsetnosy: + josh.r
messages: + msg218019
2014-05-06 18:19:18ned.deilysetnosy: + bquinlan
2014-05-03 23:38:08andreasvccreate