This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: python3 multiprocessing queue deadlock when use thread and process at same time
Type: behavior Stage: resolved
Components: Library (Lib) Versions: Python 3.6
process
Status: closed Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: beruhan
Priority: normal Keywords:

Created on 2018-12-29 02:50 by beruhan, last changed 2022-04-11 14:59 by admin. This issue is now closed.

Messages (1)
msg332690 - (view) Author: beruhan (beruhan) Date: 2018-12-29 02:50
I used multi-processes to handle cpu intensive task,I have a thread reading data from stdin and put it to a input_queue,  a thread get data from output_queue and write it to stdout, multiple processes get data from input queue,then handled the data,and put it to output_queue.But It some times will block forever,I doubt that it was because inappropriate to use the multiprocessing Queue,But I don't know how to solved it,can anyone help me?
my code as follows:

    import multiprocessing
    import sys
    import threading
    import time
    from multiprocessing import Queue


    def write_to_stdout(result_queue: Queue):
        """write queue data to stdout"""
        while True:
            data = result_queue.get()
            if data is StopIteration:
                break
            sys.stdout.write(data)
            sys.stdout.flush()
    
    
    def read_from_stdin(queue):
        """read data from stdin, put it in queue for process handling"""
        try:
            for line in sys.stdin:
                queue.put(line)
        finally:
            queue.put(StopIteration)
    
    
    def process_func(input_queue, result_queue):
        """get data from input_queue,handled,put result into result_queue"""
        try:
            while True:
                data = input_queue.get()
                if data is StopIteration:
                    break
                # cpu intensive task,use time.sleep instead
                # result = compute_something(data)
                time.sleep(0.1)
                result_queue.put(data)
        finally:
            # ensure every process end
            input_queue.put(StopIteration)
    
    
    if __name__ == '__main__':
        # queue for reading to stdout
        input_queue = Queue(1000)
    
        # queue for writing to stdout
        result_queue = Queue(1000)
    
        # thread reading data from stdin
        input_thread = threading.Thread(target=read_from_stdin, args=(input_queue,))
        input_thread.start()
    
        # thread reading data from stdin
        output_thread = threading.Thread(target=write_to_stdout, args=(result_queue,))
        output_thread.start()
    
        processes = []
        cpu_count = multiprocessing.cpu_count()
        # start multi-process to handle some cpu intensive task
        for i in range(cpu_count):
            proc = multiprocessing.Process(target=process_func, args=(input_queue, result_queue))
            proc.start()
            processes.append(proc)
    
        # joined input thread
        input_thread.join()
    
        # joined all task processes
        for proc in processes:
            proc.join()
    
        # ensure output thread end
        result_queue.put(StopIteration)
    
        # joined output thread
        output_thread.join()

test environment:  

    python3.6
    ubuntu16.04 lts
History
Date User Action Args
2022-04-11 14:59:09adminsetgithub: 79788
2018-12-29 02:52:08beruhansetstatus: open -> closed
stage: resolved
2018-12-29 02:50:42beruhansetmessages: + msg332690
2018-12-29 02:50:31beruhansetfiles: - 新建文本文档.txt
2018-12-29 02:50:03beruhancreate