This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author tim.peters
Recipients Weize, davin, pitrou, tim.peters
Date 2017-12-20.19:31:46
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1513798306.18.0.213398074469.issue32382@psf.upfronthosting.co.za>
In-reply-to
Content
First thing:  the code uses the global name `outputer` for two different things, as the name of a module function and as the global name given to the Process object running that function.  At least on Windows under Python 3.6.4 that confusion prevents the program from running.  So rename one of them.

Then comes the pain ;-)  A multiprocessing queue is a rather complex object under the covers, and the docs don't really spell out all the details.  Maybe they should.

The docs do vaguely sketch that a "feeder thread" is created in each process using an mp.queue, which feeds object you .put() from an internal buffer into an interprocess pipe.  The internal buffer is needed in case you .put() so many objects so fast that feeding them into a pipe directly would cause the OS pipe functions to fail.

And that happens in your case:  you have 10 producers running at full speed overwhelming a single slow consumer.  _Most_ of the data enqueued by output_queue.put(i+1) is sitting in those internal buffers most of the time, and the base interprocess pipe doesn't know anything about them for the duration.

The practical consequence:  while the queue always reflects the order in which objects were .put() within a single process, there's no guarantee about ordering _across_ processes.  Objects are fed from internal buffers into the shared pipe whenever a process's feeder thread happens to wake up and sees that the pipe isn't "too full".  task_queue.task_done() only records that an object has been taken off of task_queue and _given_ to output_queue.put(i+1); most of the time, the latter just sticks i+1 into an internal buffer because output_queue's shared pipe is too full to accept another object.

Given that this is how things actually work, what you can do instead is add:

    for w in workers:
        w.join()

somwehere before output_queue.put(None).  A worker process doesn't end until its feeder thread(s) complete feeding all the internal buffer objects into pipes, so .join()'ing a worker is the one "obvious" way to guarantee that all the worker's .put() actions have wholly completed.

In which case, there's no point to using a JoinableQueue at all - .task_done() no longer serves any real purpose in the code then.
History
Date User Action Args
2017-12-20 19:31:46tim.peterssetrecipients: + tim.peters, pitrou, davin, Weize
2017-12-20 19:31:46tim.peterssetmessageid: <1513798306.18.0.213398074469.issue32382@psf.upfronthosting.co.za>
2017-12-20 19:31:46tim.peterslinkissue32382 messages
2017-12-20 19:31:46tim.peterscreate