from time import sleep NSTAGES = 10 def init(*args): global qs, iolock qs, iolock = args def worker(i): inq, outq = qs[i : i+2] if i == 0: assert inq is None # Note that there's no sleep in this branch. # Because outq has maxsize=1, the .put() blocks # until worker 1 has cleared the queue. for j in range(100): outq.put(j) with iolock: print("produced", j) else: while True: j = inq.get() if j is None: break sleep(1.0) outq.put(j + i * 10 ** (i + 3)) outq.put(None) if __name__ == "__main__": import multiprocessing as mp qs = [None] + [mp.Queue(maxsize=1) for i in range(NSTAGES)] iolock = mp.Lock() pool = mp.Pool(NSTAGES, initializer=init, initargs=(qs, iolock)) pool.imap(worker, range(NSTAGES)) while True: i = qs[-1].get() if i is None: break with iolock: print("got", i) pool.close() pool.join()