classification
Title: multiprocessing.Pool.imap() should be lazy
Type: enhancement Stage:
Components: Library (Lib) Versions: Python 3.9
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: kousu, rhettinger, tim.peters
Priority: normal Keywords:

Created on 2020-03-30 04:06 by kousu, last changed 2020-04-03 21:19 by tim.peters.

Files
File name Uploaded Description Edit
multiprocessing-eager-imap.py kousu, 2020-03-30 04:06 problem demo code
pipe.py tim.peters, 2020-04-03 21:19
Messages (4)
msg365295 - (view) Author: Nick Guenther (kousu) Date: 2020-03-30 04:06
multiprocessing.Pool.imap() is supposed to be a lazy version of map. But it's not: it submits work to its workers eagerly. As a consequence, in a pipeline, all the work from earlier steps is queued, performed, and finished first, before starting later steps.

If you use python3's built-in map() -- aka the old itertools.imap() -- the operations are on-demand, so it surprised me that Pool.imap() doesn't. It's basically no better than using Pool.map(). Maybe it saves memory by not materializing large iterables in every worker process? But it still materializes the CPU time from the iterables even if unneeded.

This can be partially worked around by giving each step of the pipeline its own Pool -- then, at least the earlier steps of the pipeline don't block the later steps -- but the jobs are still done eagerly instead of waiting for their results to actually be requested.
msg365440 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2020-03-31 23:58
"Lazy" has several possible aspects, of which Pool.imap() satisfies some:

- Its iterable argument is materialized one object at a time.

- It delivers results one at a time.

So, for example, if `worker` is a function that takes a single int, then

    pool = multiprocessing.Pool(4)
    for i in pool.imap(worker, itertools.count(1)):
        print(i)

works fine, despite that the iterable argument, and the result sequence, are "infinite".

You seem to have something more severe in mind, more along the lines of that the iterable isn't advanced unless it absolutely _needs_ to be advanced in order to deliver a result that's being demanded.  That's how, e.g., the builtin Python 3 `map()` works.

But if the iterable isn't advanced until the main program _demands_ the next result from imap(), then the main program blocks until the machinery peels off the next object from the iterable, picks a worker to send it to, sends it, waits for the worker to deliver the result back on an internal queue, then delivers the result to the main program.  There's no parallelism then.

The way things are now, imap() consumes the iterable as quickly as possible, keeping all workers as busy as possible, regardless of how quickly (or even whether) results are demanded.  And seems to me that's overwhelmingly what people using multiprocessing would want.  In any case, that's what they _have_, so that couldn't be changed lightly (if it all).

Perhaps it would be more profitable to think about ways to implement your pipelines using other primitives?  For example, the first thing I'd try for an N-stage pipeline is a chain of N processes (not in a Pool) connected one to the next by queues.  If for some reason I was determined not to let any process "get ahead", easy - specify a max size of 1 for the queues.  map-like facilities are inherently SIMD style, but pipelines typically have very different code in different stages.
msg365523 - (view) Author: Nick Guenther (kousu) Date: 2020-04-01 22:34
Thank you for taking the time to consider my points! Yes, I think you understood exactly what I was getting at.

I slept on it and thought about what I'd posted the day after and realized most of the points you raise, especially that serialized next() would mean serialized processing. So the naive approach is out.

I am motivated by trying to introduce backpressure to my pipelines. The example you gave has potentially infinite memory usage; if I simply slow it down with sleep() I get a memory leak and the main python proc pinning my CPU, even though it "isn't" doing anything:

    with multiprocessing.Pool(4) as pool:
        for i, v in enumerate(pool.imap(worker, itertools.count(1)), 1):
            time.sleep(7)
            print(f"At {i}: {v}, memory usage is {sys.getallocatedblocks()}")

At 1->1, memory usage is 230617
At 2->8, memory usage is 411053
At 3->27, memory usage is 581439
At 4->64, memory usage is 748584
At 5->125, memory usage is 909694
At 6->216, memory usage is 1074304
At 7->343, memory usage is 1238106
At 8->512, memory usage is 1389162
At 9->729, memory usage is 1537830
At 10->1000, memory usage is 1648502
At 11->1331, memory usage is 1759864
At 12->1728, memory usage is 1909807
At 13->2197, memory usage is 2005700
At 14->2744, memory usage is 2067407
At 15->3375, memory usage is 2156479
At 16->4096, memory usage is 2240936
At 17->4913, memory usage is 2328123
At 18->5832, memory usage is 2456865
At 19->6859, memory usage is 2614602
At 20->8000, memory usage is 2803736
At 21->9261, memory usage is 2999129

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND     
11314 kousu     20   0  303308  40996   6340 S  91.4  2.1   0:21.68 python3.8   
11317 kousu     20   0   54208  10264   4352 S  16.2  0.5   0:03.76 python3.8   
11315 kousu     20   0   54208  10260   4352 S  15.8  0.5   0:03.74 python3.8   
11316 kousu     20   0   54208  10260   4352 S  15.8  0.5   0:03.74 python3.8   
11318 kousu     20   0   54208  10264   4352 S  15.5  0.5   0:03.72 python3.8 

It seems to me like any usage of Pool.imap() either has the same issue lurking or is run on a small finite data set where you are better off using Pool.map().

I like generators because they keep constant-time constant-memory work, which seems like a natural fit for stream processing situations. Unix pipelines have backpressure built-in, because write() blocks when the pipe buffer is full.

I did come up with one possibility after sleeping on it again: run the final iteration in parallel, perhaps by a special plist() method that makes as many parallel next() calls as possible. There's definitely details to work out but I plan to prototype when I have spare time in the next couple weeks.

You're entirely right that it's a risky change to suggest, so maybe it would be best expressed as a package if I get it working. Can I keep this issue open to see if it draws in insights from anyone else in the meantime?

Thanks again for taking a look! That's really cool of you!
msg365723 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2020-04-03 21:19
Whenever there's parallel processing with communication, there's always the potential for producers to pump out data faster than consumers can process them.  But builtin primitives generally don't try to address that directly.  They don't - and can't - know enough about the application's intent.

Instead, as I briefly alluded to earlier, mediation (when needed) is frequently accomplished by users by explicit use of bounded queues.  When process A produces data for process B, it sends the data over a bounded queue.  Nothing is done to slow A down, except that when a bounded queue is full, an attempt to enqueue a new piece of data blocks until B removes some old data from the queue.  That's a dead easy, painless, and foolproof way to limit A's speed to the rate at which B can consume data.  Nothing in A's logic changes - it's the communication channel that applies the brakes.

I'll attach `pipe.py` to illustrate.  It constructs a 10-stage pipeline.  The first process in the chain *could* produce data at a ferocious rate - but the bounded queue connecting it to the next process slows it to the rate at which the second process runs.  All the other processes in the pipeline grab data from the preceding process via a bounded queue, work on it for a second (just a sleep(1) here), then enqueue the result for the next process in the pipeline.  The main program loops, pulling data off the final process as fast as results show up.

So, if you run it, you'll see that new data is produced once per second, then when the pipeline is full final results are delivered once per second.  When the first process is done, results continue to be pulled off one per second until the pipeline is drained.

The queues here have maximum size 1, just to show that it works.  In practice, a larger bound is usually used, to allow for that processes in real life often take varying amounts of time depending on the data they're currently processing.  Letting a handful of work items queue up keeps processes busy - if processing some piece of data goes especially fast, fine, there may be more in the input queue waiting to go immediately.  How many is "a handful"?  Again, that's something that can't in general be guessed - the programmer has to apply their best judgment based on their deep knowledge of intended application behavior.

Just to be cute ;-) , the code here uses imap() to start the 10 worker processes.  The result of calling imap() is never used, it's just an easy way to apply a Pool to the task.  And it absolutely relies on that imap() consumes its iterable (range(NSTAGES)) as fast as it can, to get the NSTAGES=10 worker processes running.
History
Date User Action Args
2020-04-03 21:19:56tim.peterssetfiles: + pipe.py

messages: + msg365723
2020-04-01 22:34:52koususetmessages: + msg365523
2020-03-31 23:58:43tim.peterssetnosy: + tim.peters
messages: + msg365440
2020-03-31 22:59:13rhettingersetnosy: + rhettinger

type: enhancement
components: + Library (Lib)
versions: + Python 3.9, - Python 2.7, Python 3.5, Python 3.6, Python 3.7, Python 3.8
2020-03-30 04:06:27kousucreate