#!/usr/bin/env python3 """ multiprocessing.Pool.imap() is supposed to be a lazy version of map. But it's not. It submits work to its worker pools eagerly. As a consequence, in a data pipeline, all the work from the first step is queued, performed, and finished first, before starting later steps. If you use python3's built-in map() -- aka the old itertools.imap() -- the operations are on-demand, so it surprised me that Pool.imap() doesn't. It's basically no better than using Pool.map(). Maybe it saves memory by not materializing large iterables? This can be worked around by giving each step of the pipeline its own Pool -- then, at least the earlier steps of the pipeline don't block the later steps -- but the jobs are still done eagerly instead of waiting for their results to actually be requested. """ import time import multiprocessing def task1(e): print("task1(%d)" % (e,)) time.sleep(.1) return e**2 def task2(e): print("task2(%d)" % (e,)) time.sleep(.1) return e + 9 def pipeline(): """ All the pipelines here are (in pseudo-shell notation): range(12) | task1 | task2 | list With built-in map(), the operations are interleaved, because they happen on-demand: >>> pipeline() Constructing [input] Constructing [task1] Constructing [task2] Asking for [result] task1(0) task2(0) task1(1) task2(1) task1(2) task2(4) task1(3) task2(9) task1(4) task2(16) task1(5) task2(25) task1(6) task2(36) task1(7) task2(49) task1(8) task2(64) task1(9) task2(81) task1(10) task2(100) task1(11) task2(121) [9, 10, 13, 18, 25, 34, 45, 58, 73, 90, 109, 130] """ print("Constructing [input]") l = range(12) print("Constructing [task1]") l = map(task1, l) print("Constructing [task2]") l = map(task2, l) print("Asking for [result]") l = list(l) print(l) def pipeline_multiprocessing(): """ But introduce Pool.imap() and now the operations are suddenly serialized? (the total runtime is faster, but the order of it is unexpected) >>> pipeline_multiprocessing() Constructing [input] Constructing [task1] Constructing [task2] Asking for [result] task1(0) task1(1) task1(2) task1(3) task1(4) task1(5) task1(6) task1(7) task1(8) task1(9) task1(10) task1(11) task2(0) task2(1) task2(4) task2(9) task2(16) task2(25) task2(36) task2(49) task2(64) task2(81) task2(100) task2(121) [9, 10, 13, 18, 25, 34, 45, 58, 73, 90, 109, 130] """ pool = multiprocessing.Pool() map = pool.imap print("Constructing [input]") l = range(12) print("Constructing [task1]") l = map(task1, l) print("Constructing [task2]") l = map(task2, l) print("Asking for [result]") l = list(l) print(l) pool.close() pool.join() def pipeline_multiprocessing_eager_sleep(): """ In fact, adding a sleep() *before* the final list() which should kick-off processing (and in the map() example, does), underscores that it is Pool choosing to evaluate the tasks, not me: >>> pipeline_multiprocessing_eager_sleep() Constructing [input] Constructing [task1] Constructing [task2] task1(0) task1(1) task1(2) task1(3) task1(4) task1(5) task1(6) task1(7) task1(8) task1(9) task1(10) task1(11) task2(0) task2(1) task2(4) task2(9) task2(16) task2(25) task2(36) task2(49) Asking for [result] task2(64) task2(81) task2(100) task2(121) [9, 10, 13, 18, 25, 34, 45, 58, 73, 90, 109, 130] """ pool = multiprocessing.Pool() map = pool.imap print("Constructing [input]") l = range(12) print("Constructing [task1]") l = map(task1, l) print("Constructing [task2]") l = map(task2, l) time.sleep(1) print("Asking for [result]") l = list(l) print(l) pool.close() pool.join() def pipeline_separate_pools(): """ With separate Pools for each step, at least the steps don't block each other (except when they're starved), but they still evaluate eagerly: >>> pipeline_separate_pools() Constructing [input] Constructing [task1] Constructing [task2] task1(0) task1(1) task1(2) task1(3) task2(0) task2(1) task1(4) task1(5) task2(4) task2(9) task1(6) task1(7) task2(16) task2(25) task1(8) task1(9) task2(36) task2(49) task1(10) task1(11) task2(64) task2(81) task2(100) task2(121) Asking for [result] [9, 10, 13, 18, 25, 34, 45, 58, 73, 90, 109, 130] """ pool = multiprocessing.Pool() pool2 = multiprocessing.Pool() print("Constructing [input]") l = range(12) print("Constructing [task1]") l = pool.imap(task1, l) print("Constructing [task2]") l = pool2.imap(task2, l) time.sleep(1) print("Asking for [result]") l = list(l) print(l) pool.close() pool.join() pool2.close() pool2.join() if __name__ == '__main__': #pipeline() #pipeline_multiprocessing() pipeline_multiprocessing_eager_sleep() #pipeline_separate_pools()