New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
concurrent.futures.ProcessPoolExecutor.map() doesn't batch function arguments by chunks #55480
Comments
I tested the new concurrent.futures.ProcessPoolExecutor.map() in 3.2 with the is_prime() function from the documentation example. This was significantly slower than using multiprocessing.Pool.map(). Quick look at the source showed that multiprocessing sends the iterable in chunks to the worker process while futures sends always only one entry of the iterable to the worker. Functions like is_prime() which finish relatively fast make the communication overhead (at least I guess that is the culprit) very big in comparison. Attached is a file which demonstrates the problem and a quick workaround. The workaround uses the chunk idea from multiprocessing. The problem is that it requires the iterables passed to map() to have a length and be indexable with a slice. I believe this limitation could be worked around. |
Playing around a bit I wrote the attached implementation which works with all iterables. |
On my crappy computer, ProcessPoolExecutor.map adds <3ms of added execution time per call using your test framework. What is your use case where that is too much? That being said, I don't have any objections to making improvements. If you want to pursue this, could you attach a working map_comprison.py? |
I can confirm an overhead of 2 ms to 3 ms using a relatively recent Intel Core i5 CPU. I (personally) believe these 3 ms to be a pretty big overhead on modern computers and I also believe that it would be relatively simple to reduce it. I don't have much time at the moment but I will try to produce a complete proof of concept patch for the futures module in the next weeks. I'd be happy to get some comments when it is finished. |
Using your test script fixed (on Python 3.3), I get the following numbers: Starting multiproc...done in 2.1014609336853027 s. So there's a 0.2ms overhead per remote function call here (20/(100100000-100000000)). Can't your chunks() function use itertools.islice()? Also, the chunksize can't be anything else than 1 by default, since your approach is increasing latency of returning results. |
I'm closing this since tbrink didn't respond to pitrou's comments. |
I'm seeing an even larger difference between multiprocessing.Pool and ProcessPoolExecutor on my machine, with Python 3.4: Starting multiproc...done in 2.160644769668579 s. I've updated the initial patch to address the comments Antoine made; the chunksize now defaults to 1, and itertools is used to chunk the input iterables, rather than building a list. Attached is an updated benchmark script: Starting multiproc...done in 2.2295100688934326 s. The new implementation of map has essentially identical performance to the original with chunksize=1, but it performs much better with a larger chunksize provided. |
Here's a patch that adds the new map implementation from the benchmark script to concurrent.futures.process. |
I've added new versions of the patch/demonstration that ensures we actually submit all the futures before trying to yield from the returned iterator. Previously we were just returning a generator object when map was called, without actually submitting any futures. |
Thank you for posting this, I'm reopening the issue. |
Thank you for the patch. I've posted some review comments. Two further remarks:
|
re: Diverging ThreadPoolExecutor and ProcessPoolExecutor APIs. I thought about this as well. It would of course be possible to make ThreadPoolExecutor's API match, but it would serve no useful purpose that I can think of. I guess we could make the ThreadPoolExecutor API accept the chunksize argument just so the APIs match, but then not actually use it (and document that it's not used, of course). That would make it easier to switch between the two APIs, at least. On the other hand, the two APIs are slightly different already: ThreadPoolExecutor requires max_workers be supplied, while ProcessPoolExecutor will default to multiprocessing.cpu_count(). So this wouldn't completely be without precedent. Anyway, I will review your comments on the patch, add unit tests, and re-submit. Thanks! |
I've attached an updated patch based on your review comments; there's now a unit test with a non-default chunksize, the chunking algorithm is more readable, and the same code path is used no matter what chunksize is provided. I've also updated the test script to match the implementation changes. |
Here's an updated patch that adds documentation and Antoine's requested code changes. |
Thanks for the update! This looks basically good, I'll wait a bit to see if other people have comments, otherwise I'll commit. |
A couple of small updates based on comments from Charles-François Natali:
|
New changeset f87c2c4f03da by Antoine Pitrou in branch 'default': |
Sorry for the delay. I simplified the patch a tiny bit ("yield from" wasn't needed, it was sufficient to result the itertools.chain.from_iterable() result), and committed it. Thank you! |
Hey, my first committed patch :) Thanks for helping to push this through, Antoine! |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: