This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Python mulitiprocessing.Queue fail to get according to correct sequence
Type: behavior Stage: resolved
Components: Library (Lib) Versions: Python 3.6
process
Status: closed Resolution: not a bug
Dependencies: Superseder:
Assigned To: Nosy List: Weize, davin, pitrou, tim.peters
Priority: normal Keywords:

Created on 2017-12-20 09:55 by Weize, last changed 2022-04-11 14:58 by admin. This issue is now closed.

Messages (4)
msg308710 - (view) Author: Weize (Weize) Date: 2017-12-20 09:55
I try to implement a "producer consumer" like design with mulitiprocessing module in my project, but I found that mulitiprocessing.Queue 's behavior is not as my expected. It seems Queue.get method return the end flag at the end of my queue too early. 

I am not experienced at muliti-process programing, I am not sure it's a bug or not. For reproduce this, I have simplified my code as following:

``` Python
import time
import multiprocessing as mp

def worker(task_queue, output_queue):
    while 1:
        i = task_queue.get()
        if i is None:
            print("Process-%d done"%mp.current_process().pid)
            task_queue.task_done()
            break
        output_queue.put(i+1)
        task_queue.task_done()

def outputer(output_queue):
    c = 0 # val for count how many obj geted
    while 1:
        j = output_queue.get()
        if j is None:
            print("Process(output)-%d done"%mp.current_process().pid)
            c += 1
            print("outputer get %d objects from the output_queue"%c)
            assert output_queue.empty(), "output queue should be empty here"
            break
        time.sleep(0.0001) # do output here
        c += 1

if __name__ == "__main__":
    task_queue = mp.JoinableQueue()
    #output_queue = mp.JoinableQueue()
    output_queue = mp.Queue()

    workers = [mp.Process(target=worker, args=(task_queue, output_queue))
               for i in range(10)]
    outputer = mp.Process(target=outputer, args=(output_queue,))

    for w in workers:
        w.start()
    outputer.start()

    for i in range(10**6):
        task_queue.put(i)
    for w in workers: # put end flag to task queue
        task_queue.put(None)

    task_queue.join() # wait all tasks done
    print("all tasks done.")

    print("queue size before put end flag: %d"%output_queue.qsize())
    output_queue.put(None) # put end flag to output queue

    print("end")
```

Get the output:

Process-20923 done
Process-20931 done
Process-20925 done
Process-20930 done
Process-20927 done
Process-20929 done
Process-20928 done
Process-20926 done
Process-20924 done
Process-20932 done
all tasks done.
queue size before put end flag: 914789
end
Process(output)-20933 done
outputer get 90383 objects from the output_queue
Process Process-11:
Traceback (most recent call last):
  File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "joinablequeue.py", line 27, in outputer
    assert output_queue.empty(), "output queue should be empty here"
AssertionError: output queue should be empty here


I have wait all worker put the output to the output queue use taks_queue.join(), then I put the end flag to the output queue, but according to outputer's printed information, it get the `None` end flag before other value in the queue. It seems queue not get value according to 'FIFO' rule.
msg308782 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-12-20 19:14
Hi Weize,

Since this seems be a support question, I suggest you ask it either on https://stackoverflow.com/ or on the Python users' mailing-list https://mail.python.org/mailman/listinfo/python-list
msg308783 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-12-20 19:15
Closing this issue in the meantime.
msg308788 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2017-12-20 19:31
First thing:  the code uses the global name `outputer` for two different things, as the name of a module function and as the global name given to the Process object running that function.  At least on Windows under Python 3.6.4 that confusion prevents the program from running.  So rename one of them.

Then comes the pain ;-)  A multiprocessing queue is a rather complex object under the covers, and the docs don't really spell out all the details.  Maybe they should.

The docs do vaguely sketch that a "feeder thread" is created in each process using an mp.queue, which feeds object you .put() from an internal buffer into an interprocess pipe.  The internal buffer is needed in case you .put() so many objects so fast that feeding them into a pipe directly would cause the OS pipe functions to fail.

And that happens in your case:  you have 10 producers running at full speed overwhelming a single slow consumer.  _Most_ of the data enqueued by output_queue.put(i+1) is sitting in those internal buffers most of the time, and the base interprocess pipe doesn't know anything about them for the duration.

The practical consequence:  while the queue always reflects the order in which objects were .put() within a single process, there's no guarantee about ordering _across_ processes.  Objects are fed from internal buffers into the shared pipe whenever a process's feeder thread happens to wake up and sees that the pipe isn't "too full".  task_queue.task_done() only records that an object has been taken off of task_queue and _given_ to output_queue.put(i+1); most of the time, the latter just sticks i+1 into an internal buffer because output_queue's shared pipe is too full to accept another object.

Given that this is how things actually work, what you can do instead is add:

    for w in workers:
        w.join()

somwehere before output_queue.put(None).  A worker process doesn't end until its feeder thread(s) complete feeding all the internal buffer objects into pipes, so .join()'ing a worker is the one "obvious" way to guarantee that all the worker's .put() actions have wholly completed.

In which case, there's no point to using a JoinableQueue at all - .task_done() no longer serves any real purpose in the code then.
History
Date User Action Args
2022-04-11 14:58:55adminsetgithub: 76563
2017-12-20 19:31:46tim.peterssetnosy: + tim.peters
messages: + msg308788
2017-12-20 19:15:14pitrousetstatus: open -> closed
resolution: not a bug
messages: + msg308783

stage: resolved
2017-12-20 19:14:11pitrousetnosy: + pitrou
messages: + msg308782
2017-12-20 09:55:32Weizecreate