classification
Title: Make Executor.map work with infinite/large inputs correctly
Type: Stage:
Components: Library (Lib) Versions: Python 3.7
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: Klamann, bquinlan, ezio.melotti, josh.r, max
Priority: normal Keywords:

Created on 2017-03-18 01:32 by josh.r, last changed 2017-05-16 09:34 by Klamann.

Pull Requests
URL Status Linked Edit
PR 707 open python-dev, 2017-03-18 02:13
Messages (4)
msg289789 - (view) Author: Josh Rosenberg (josh.r) * Date: 2017-03-18 01:32
As currently implemented, Executor.map is not particularly lazy. Specifically, if given huge argument iterables, it will not begin yielding results until all tasks have been submitted; if given an infinite input iterable, it will run out of memory before yielding a single result.

This makes it unusable as a drop in replacement for plain map, which, being lazy, handles infinite iterables just fine, and produces results promptly.

Proposed change makes Executor.map begin yielding results for large iterables without submitting every task up front. As a reasonable default, I have it submit a number of tasks equal to twice the number of workers, submitting a new task immediately after getting results for the next future in line, before yielding the result (to ensure the number of outstanding futures stays constant). A new keyword-only argument, prefetch, is provided to explicitly specify how many tasks should be queued above and beyond the number of workers.

Working on submitting pull request now.
msg289790 - (view) Author: Josh Rosenberg (josh.r) * Date: 2017-03-18 02:18
Nosying folks suggested by GitHub, hope that's the right etiquette.

For the record, filled out contributor agreement ages ago, but hadn't linked (or even created) GitHub account until after I got the warning. I've linked this account to my GitHub username now, hope that's sufficient.
msg293677 - (view) Author: Max (max) * Date: 2017-05-15 05:24
I'm also concerned about this (undocumented) inconsistency between map and Executor.map.

I think you would want to make your PR limited to `ThreadPoolExecutor`. The `ProcessPoolExecutor` already does everything you want with its `chunksize` paramater, and adding `prefetch` to it will jeopardize the optimization for which `chunksize` is intended.

Actually, I was even thinking whether it might be worth merging `chunksize` and `prefetch` arguments. The semantics of the two arguments is similar but not identical. Specifically, for `ProcessPoolExecutor`, there is pretty clear pressure to increase the value of `chunksize` to reduce amortized IPC costs; there is no IPC with threads, so the pressure to increase `prefetch` is much more situational (e.g., in the busy pool example I give below).

For `ThreadPoolExecutor`, I prefer your implementation over the current one, but I want to point out that it is not strictly better, in the sense that *with default arguments*, there are situations where the current implementation behaves better.

In many cases your implementation behaves much better. If the input is too large, it prevents out of memory condition. In addition, if the pool is not busy when `map` is called, your implementation will also be faster, since it will submit the first input for processing earlier.

But consider the case where input is produced slower than it can be processed (`iterables` may fetch data from a database, but the callable `fn` may be a fast in-memory transformation). Now suppose the `Executor.map` is called when the pool is busy, so there'll be a delay before processing begins. In this case, the most efficient approach is to get as much input as possible while the pool is busy, since eventually (when the pool is freed up) it will become the bottleneck. This is exactly what the current implementation does.

The implementation you propose will (by default) only prefetch a small number of input items. Then when the pool becomes available, it will quickly run out of prefetched input, and so it will be less efficient than the current implementation. This is especially unfortunate since the entire time the pool was busy, `Executor.map` is just blocking the main thread so it's literally doing nothing useful.

Of course, the client can tweak `prefetch` argument to achieve better performance. Still, I wanted to make sure this issue is considered before the new implementation is adopted.

From the performance perspective, an even more efficient implementation would be one that uses three background threads:

- one to prefetch items from the input
- one to sends items to the workers for processing
- one to yield results as they become available

It has a disadvantage of being slightly more complex, so I don't know if it really belongs in the standard library.

Its advantage is that it will waste less time: it fetches inputs without pause, it submits them for processing without pause, and it makes results available to the client as soon as they are processed. (I have implemented and tried this approach, but not in productioon.)

But even this implementation requires tuning. In the case with the busy pool that I described above, one would want to prefetch as much input as possible, but that may cause too much memory consumption and also possibly waste computation resources (if the most of input produced proves to be unneeded in the end).
msg293708 - (view) Author: Max (max) * Date: 2017-05-15 12:05
Correction: this PR is useful for `ProcessPoolExecutor` as well. I thought `chunksize` parameter handles infinite generators already, but I was wrong. And, as long as the number of items prefetched is a multiple of `chunksize`, there are no issues with the chunksize optimization either.

And a minor correction: when listing the advantages of this PR, I should have said: "In addition, if the pool is not busy when `map` is called, your implementation will also be more responsive, since it will yield the first result earlier."
History
Date User Action Args
2017-05-16 09:38:40berker.peksaglinkissue30323 superseder
2017-05-16 09:34:03Klamannsetnosy: + Klamann
2017-05-15 12:05:27maxsetmessages: + msg293708
2017-05-15 05:24:58maxsetnosy: + max
messages: + msg293677
2017-03-19 13:19:29josh.rsettitle: Executor.map should not submit all futures prior to yielding any results -> Make Executor.map work with infinite/large inputs correctly
2017-03-18 02:18:42josh.rsetnosy: + bquinlan, ezio.melotti
messages: + msg289790
2017-03-18 02:13:29python-devsetpull_requests: + pull_request627
2017-03-18 01:32:56josh.rcreate