Message308710
I try to implement a "producer consumer" like design with mulitiprocessing module in my project, but I found that mulitiprocessing.Queue 's behavior is not as my expected. It seems Queue.get method return the end flag at the end of my queue too early.
I am not experienced at muliti-process programing, I am not sure it's a bug or not. For reproduce this, I have simplified my code as following:
``` Python
import time
import multiprocessing as mp
def worker(task_queue, output_queue):
while 1:
i = task_queue.get()
if i is None:
print("Process-%d done"%mp.current_process().pid)
task_queue.task_done()
break
output_queue.put(i+1)
task_queue.task_done()
def outputer(output_queue):
c = 0 # val for count how many obj geted
while 1:
j = output_queue.get()
if j is None:
print("Process(output)-%d done"%mp.current_process().pid)
c += 1
print("outputer get %d objects from the output_queue"%c)
assert output_queue.empty(), "output queue should be empty here"
break
time.sleep(0.0001) # do output here
c += 1
if __name__ == "__main__":
task_queue = mp.JoinableQueue()
#output_queue = mp.JoinableQueue()
output_queue = mp.Queue()
workers = [mp.Process(target=worker, args=(task_queue, output_queue))
for i in range(10)]
outputer = mp.Process(target=outputer, args=(output_queue,))
for w in workers:
w.start()
outputer.start()
for i in range(10**6):
task_queue.put(i)
for w in workers: # put end flag to task queue
task_queue.put(None)
task_queue.join() # wait all tasks done
print("all tasks done.")
print("queue size before put end flag: %d"%output_queue.qsize())
output_queue.put(None) # put end flag to output queue
print("end")
```
Get the output:
Process-20923 done
Process-20931 done
Process-20925 done
Process-20930 done
Process-20927 done
Process-20929 done
Process-20928 done
Process-20926 done
Process-20924 done
Process-20932 done
all tasks done.
queue size before put end flag: 914789
end
Process(output)-20933 done
outputer get 90383 objects from the output_queue
Process Process-11:
Traceback (most recent call last):
File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "joinablequeue.py", line 27, in outputer
assert output_queue.empty(), "output queue should be empty here"
AssertionError: output queue should be empty here
I have wait all worker put the output to the output queue use taks_queue.join(), then I put the end flag to the output queue, but according to outputer's printed information, it get the `None` end flag before other value in the queue. It seems queue not get value according to 'FIFO' rule. |
|
Date |
User |
Action |
Args |
2017-12-20 09:55:32 | Weize | set | recipients:
+ Weize, davin |
2017-12-20 09:55:32 | Weize | set | messageid: <1513763732.34.0.213398074469.issue32382@psf.upfronthosting.co.za> |
2017-12-20 09:55:32 | Weize | link | issue32382 messages |
2017-12-20 09:55:32 | Weize | create | |
|