Issue40110
This issue tracker has been migrated to GitHub,
and is currently read-only.
For more information,
see the GitHub FAQs in the Python's Developer Guide.
Created on 2020-03-30 04:06 by kousu, last changed 2022-04-11 14:59 by admin.
Files | ||||
---|---|---|---|---|
File name | Uploaded | Description | Edit | |
multiprocessing-eager-imap.py | kousu, 2020-03-30 04:06 | problem demo code | ||
pipe.py | tim.peters, 2020-04-03 21:19 |
Messages (4) | |||
---|---|---|---|
msg365295 - (view) | Author: Nick Guenther (kousu) | Date: 2020-03-30 04:06 | |
multiprocessing.Pool.imap() is supposed to be a lazy version of map. But it's not: it submits work to its workers eagerly. As a consequence, in a pipeline, all the work from earlier steps is queued, performed, and finished first, before starting later steps. If you use python3's built-in map() -- aka the old itertools.imap() -- the operations are on-demand, so it surprised me that Pool.imap() doesn't. It's basically no better than using Pool.map(). Maybe it saves memory by not materializing large iterables in every worker process? But it still materializes the CPU time from the iterables even if unneeded. This can be partially worked around by giving each step of the pipeline its own Pool -- then, at least the earlier steps of the pipeline don't block the later steps -- but the jobs are still done eagerly instead of waiting for their results to actually be requested. |
|||
msg365440 - (view) | Author: Tim Peters (tim.peters) * | Date: 2020-03-31 23:58 | |
"Lazy" has several possible aspects, of which Pool.imap() satisfies some: - Its iterable argument is materialized one object at a time. - It delivers results one at a time. So, for example, if `worker` is a function that takes a single int, then pool = multiprocessing.Pool(4) for i in pool.imap(worker, itertools.count(1)): print(i) works fine, despite that the iterable argument, and the result sequence, are "infinite". You seem to have something more severe in mind, more along the lines of that the iterable isn't advanced unless it absolutely _needs_ to be advanced in order to deliver a result that's being demanded. That's how, e.g., the builtin Python 3 `map()` works. But if the iterable isn't advanced until the main program _demands_ the next result from imap(), then the main program blocks until the machinery peels off the next object from the iterable, picks a worker to send it to, sends it, waits for the worker to deliver the result back on an internal queue, then delivers the result to the main program. There's no parallelism then. The way things are now, imap() consumes the iterable as quickly as possible, keeping all workers as busy as possible, regardless of how quickly (or even whether) results are demanded. And seems to me that's overwhelmingly what people using multiprocessing would want. In any case, that's what they _have_, so that couldn't be changed lightly (if it all). Perhaps it would be more profitable to think about ways to implement your pipelines using other primitives? For example, the first thing I'd try for an N-stage pipeline is a chain of N processes (not in a Pool) connected one to the next by queues. If for some reason I was determined not to let any process "get ahead", easy - specify a max size of 1 for the queues. map-like facilities are inherently SIMD style, but pipelines typically have very different code in different stages. |
|||
msg365523 - (view) | Author: Nick Guenther (kousu) | Date: 2020-04-01 22:34 | |
Thank you for taking the time to consider my points! Yes, I think you understood exactly what I was getting at. I slept on it and thought about what I'd posted the day after and realized most of the points you raise, especially that serialized next() would mean serialized processing. So the naive approach is out. I am motivated by trying to introduce backpressure to my pipelines. The example you gave has potentially infinite memory usage; if I simply slow it down with sleep() I get a memory leak and the main python proc pinning my CPU, even though it "isn't" doing anything: with multiprocessing.Pool(4) as pool: for i, v in enumerate(pool.imap(worker, itertools.count(1)), 1): time.sleep(7) print(f"At {i}: {v}, memory usage is {sys.getallocatedblocks()}") At 1->1, memory usage is 230617 At 2->8, memory usage is 411053 At 3->27, memory usage is 581439 At 4->64, memory usage is 748584 At 5->125, memory usage is 909694 At 6->216, memory usage is 1074304 At 7->343, memory usage is 1238106 At 8->512, memory usage is 1389162 At 9->729, memory usage is 1537830 At 10->1000, memory usage is 1648502 At 11->1331, memory usage is 1759864 At 12->1728, memory usage is 1909807 At 13->2197, memory usage is 2005700 At 14->2744, memory usage is 2067407 At 15->3375, memory usage is 2156479 At 16->4096, memory usage is 2240936 At 17->4913, memory usage is 2328123 At 18->5832, memory usage is 2456865 At 19->6859, memory usage is 2614602 At 20->8000, memory usage is 2803736 At 21->9261, memory usage is 2999129 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 11314 kousu 20 0 303308 40996 6340 S 91.4 2.1 0:21.68 python3.8 11317 kousu 20 0 54208 10264 4352 S 16.2 0.5 0:03.76 python3.8 11315 kousu 20 0 54208 10260 4352 S 15.8 0.5 0:03.74 python3.8 11316 kousu 20 0 54208 10260 4352 S 15.8 0.5 0:03.74 python3.8 11318 kousu 20 0 54208 10264 4352 S 15.5 0.5 0:03.72 python3.8 It seems to me like any usage of Pool.imap() either has the same issue lurking or is run on a small finite data set where you are better off using Pool.map(). I like generators because they keep constant-time constant-memory work, which seems like a natural fit for stream processing situations. Unix pipelines have backpressure built-in, because write() blocks when the pipe buffer is full. I did come up with one possibility after sleeping on it again: run the final iteration in parallel, perhaps by a special plist() method that makes as many parallel next() calls as possible. There's definitely details to work out but I plan to prototype when I have spare time in the next couple weeks. You're entirely right that it's a risky change to suggest, so maybe it would be best expressed as a package if I get it working. Can I keep this issue open to see if it draws in insights from anyone else in the meantime? Thanks again for taking a look! That's really cool of you! |
|||
msg365723 - (view) | Author: Tim Peters (tim.peters) * | Date: 2020-04-03 21:19 | |
Whenever there's parallel processing with communication, there's always the potential for producers to pump out data faster than consumers can process them. But builtin primitives generally don't try to address that directly. They don't - and can't - know enough about the application's intent. Instead, as I briefly alluded to earlier, mediation (when needed) is frequently accomplished by users by explicit use of bounded queues. When process A produces data for process B, it sends the data over a bounded queue. Nothing is done to slow A down, except that when a bounded queue is full, an attempt to enqueue a new piece of data blocks until B removes some old data from the queue. That's a dead easy, painless, and foolproof way to limit A's speed to the rate at which B can consume data. Nothing in A's logic changes - it's the communication channel that applies the brakes. I'll attach `pipe.py` to illustrate. It constructs a 10-stage pipeline. The first process in the chain *could* produce data at a ferocious rate - but the bounded queue connecting it to the next process slows it to the rate at which the second process runs. All the other processes in the pipeline grab data from the preceding process via a bounded queue, work on it for a second (just a sleep(1) here), then enqueue the result for the next process in the pipeline. The main program loops, pulling data off the final process as fast as results show up. So, if you run it, you'll see that new data is produced once per second, then when the pipeline is full final results are delivered once per second. When the first process is done, results continue to be pulled off one per second until the pipeline is drained. The queues here have maximum size 1, just to show that it works. In practice, a larger bound is usually used, to allow for that processes in real life often take varying amounts of time depending on the data they're currently processing. Letting a handful of work items queue up keeps processes busy - if processing some piece of data goes especially fast, fine, there may be more in the input queue waiting to go immediately. How many is "a handful"? Again, that's something that can't in general be guessed - the programmer has to apply their best judgment based on their deep knowledge of intended application behavior. Just to be cute ;-) , the code here uses imap() to start the 10 worker processes. The result of calling imap() is never used, it's just an easy way to apply a Pool to the task. And it absolutely relies on that imap() consumes its iterable (range(NSTAGES)) as fast as it can, to get the NSTAGES=10 worker processes running. |
History | |||
---|---|---|---|
Date | User | Action | Args |
2022-04-11 14:59:28 | admin | set | github: 84291 |
2020-04-03 21:19:56 | tim.peters | set | files:
+ pipe.py messages: + msg365723 |
2020-04-01 22:34:52 | kousu | set | messages: + msg365523 |
2020-03-31 23:58:43 | tim.peters | set | nosy:
+ tim.peters messages: + msg365440 |
2020-03-31 22:59:13 | rhettinger | set | nosy:
+ rhettinger type: enhancement components: + Library (Lib) versions: + Python 3.9, - Python 2.7, Python 3.5, Python 3.6, Python 3.7, Python 3.8 |
2020-03-30 04:06:27 | kousu | create |