This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author tim.peters
Recipients kousu, rhettinger, tim.peters
Date 2020-04-03.21:19:55
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1585948796.37.0.66822482346.issue40110@roundup.psfhosted.org>
In-reply-to
Content
Whenever there's parallel processing with communication, there's always the potential for producers to pump out data faster than consumers can process them.  But builtin primitives generally don't try to address that directly.  They don't - and can't - know enough about the application's intent.

Instead, as I briefly alluded to earlier, mediation (when needed) is frequently accomplished by users by explicit use of bounded queues.  When process A produces data for process B, it sends the data over a bounded queue.  Nothing is done to slow A down, except that when a bounded queue is full, an attempt to enqueue a new piece of data blocks until B removes some old data from the queue.  That's a dead easy, painless, and foolproof way to limit A's speed to the rate at which B can consume data.  Nothing in A's logic changes - it's the communication channel that applies the brakes.

I'll attach `pipe.py` to illustrate.  It constructs a 10-stage pipeline.  The first process in the chain *could* produce data at a ferocious rate - but the bounded queue connecting it to the next process slows it to the rate at which the second process runs.  All the other processes in the pipeline grab data from the preceding process via a bounded queue, work on it for a second (just a sleep(1) here), then enqueue the result for the next process in the pipeline.  The main program loops, pulling data off the final process as fast as results show up.

So, if you run it, you'll see that new data is produced once per second, then when the pipeline is full final results are delivered once per second.  When the first process is done, results continue to be pulled off one per second until the pipeline is drained.

The queues here have maximum size 1, just to show that it works.  In practice, a larger bound is usually used, to allow for that processes in real life often take varying amounts of time depending on the data they're currently processing.  Letting a handful of work items queue up keeps processes busy - if processing some piece of data goes especially fast, fine, there may be more in the input queue waiting to go immediately.  How many is "a handful"?  Again, that's something that can't in general be guessed - the programmer has to apply their best judgment based on their deep knowledge of intended application behavior.

Just to be cute ;-) , the code here uses imap() to start the 10 worker processes.  The result of calling imap() is never used, it's just an easy way to apply a Pool to the task.  And it absolutely relies on that imap() consumes its iterable (range(NSTAGES)) as fast as it can, to get the NSTAGES=10 worker processes running.
History
Date User Action Args
2020-04-03 21:19:56tim.peterssetrecipients: + tim.peters, rhettinger, kousu
2020-04-03 21:19:56tim.peterssetmessageid: <1585948796.37.0.66822482346.issue40110@roundup.psfhosted.org>
2020-04-03 21:19:56tim.peterslinkissue40110 messages
2020-04-03 21:19:55tim.peterscreate