This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author Weize
Recipients Weize, davin
Date 2017-12-20.09:55:32
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1513763732.34.0.213398074469.issue32382@psf.upfronthosting.co.za>
In-reply-to
Content
I try to implement a "producer consumer" like design with mulitiprocessing module in my project, but I found that mulitiprocessing.Queue 's behavior is not as my expected. It seems Queue.get method return the end flag at the end of my queue too early. 

I am not experienced at muliti-process programing, I am not sure it's a bug or not. For reproduce this, I have simplified my code as following:

``` Python
import time
import multiprocessing as mp

def worker(task_queue, output_queue):
    while 1:
        i = task_queue.get()
        if i is None:
            print("Process-%d done"%mp.current_process().pid)
            task_queue.task_done()
            break
        output_queue.put(i+1)
        task_queue.task_done()

def outputer(output_queue):
    c = 0 # val for count how many obj geted
    while 1:
        j = output_queue.get()
        if j is None:
            print("Process(output)-%d done"%mp.current_process().pid)
            c += 1
            print("outputer get %d objects from the output_queue"%c)
            assert output_queue.empty(), "output queue should be empty here"
            break
        time.sleep(0.0001) # do output here
        c += 1

if __name__ == "__main__":
    task_queue = mp.JoinableQueue()
    #output_queue = mp.JoinableQueue()
    output_queue = mp.Queue()

    workers = [mp.Process(target=worker, args=(task_queue, output_queue))
               for i in range(10)]
    outputer = mp.Process(target=outputer, args=(output_queue,))

    for w in workers:
        w.start()
    outputer.start()

    for i in range(10**6):
        task_queue.put(i)
    for w in workers: # put end flag to task queue
        task_queue.put(None)

    task_queue.join() # wait all tasks done
    print("all tasks done.")

    print("queue size before put end flag: %d"%output_queue.qsize())
    output_queue.put(None) # put end flag to output queue

    print("end")
```

Get the output:

Process-20923 done
Process-20931 done
Process-20925 done
Process-20930 done
Process-20927 done
Process-20929 done
Process-20928 done
Process-20926 done
Process-20924 done
Process-20932 done
all tasks done.
queue size before put end flag: 914789
end
Process(output)-20933 done
outputer get 90383 objects from the output_queue
Process Process-11:
Traceback (most recent call last):
  File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "joinablequeue.py", line 27, in outputer
    assert output_queue.empty(), "output queue should be empty here"
AssertionError: output queue should be empty here


I have wait all worker put the output to the output queue use taks_queue.join(), then I put the end flag to the output queue, but according to outputer's printed information, it get the `None` end flag before other value in the queue. It seems queue not get value according to 'FIFO' rule.
History
Date User Action Args
2017-12-20 09:55:32Weizesetrecipients: + Weize, davin
2017-12-20 09:55:32Weizesetmessageid: <1513763732.34.0.213398074469.issue32382@psf.upfronthosting.co.za>
2017-12-20 09:55:32Weizelinkissue32382 messages
2017-12-20 09:55:32Weizecreate