This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author beruhan
Recipients beruhan
Date 2018-12-29.02:50:42
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1546051842.35.0.599440408676.issue35607@roundup.psfhosted.org>
In-reply-to
Content
I used multi-processes to handle cpu intensive task,I have a thread reading data from stdin and put it to a input_queue,  a thread get data from output_queue and write it to stdout, multiple processes get data from input queue,then handled the data,and put it to output_queue.But It some times will block forever,I doubt that it was because inappropriate to use the multiprocessing Queue,But I don't know how to solved it,can anyone help me?
my code as follows:

    import multiprocessing
    import sys
    import threading
    import time
    from multiprocessing import Queue


    def write_to_stdout(result_queue: Queue):
        """write queue data to stdout"""
        while True:
            data = result_queue.get()
            if data is StopIteration:
                break
            sys.stdout.write(data)
            sys.stdout.flush()
    
    
    def read_from_stdin(queue):
        """read data from stdin, put it in queue for process handling"""
        try:
            for line in sys.stdin:
                queue.put(line)
        finally:
            queue.put(StopIteration)
    
    
    def process_func(input_queue, result_queue):
        """get data from input_queue,handled,put result into result_queue"""
        try:
            while True:
                data = input_queue.get()
                if data is StopIteration:
                    break
                # cpu intensive task,use time.sleep instead
                # result = compute_something(data)
                time.sleep(0.1)
                result_queue.put(data)
        finally:
            # ensure every process end
            input_queue.put(StopIteration)
    
    
    if __name__ == '__main__':
        # queue for reading to stdout
        input_queue = Queue(1000)
    
        # queue for writing to stdout
        result_queue = Queue(1000)
    
        # thread reading data from stdin
        input_thread = threading.Thread(target=read_from_stdin, args=(input_queue,))
        input_thread.start()
    
        # thread reading data from stdin
        output_thread = threading.Thread(target=write_to_stdout, args=(result_queue,))
        output_thread.start()
    
        processes = []
        cpu_count = multiprocessing.cpu_count()
        # start multi-process to handle some cpu intensive task
        for i in range(cpu_count):
            proc = multiprocessing.Process(target=process_func, args=(input_queue, result_queue))
            proc.start()
            processes.append(proc)
    
        # joined input thread
        input_thread.join()
    
        # joined all task processes
        for proc in processes:
            proc.join()
    
        # ensure output thread end
        result_queue.put(StopIteration)
    
        # joined output thread
        output_thread.join()

test environment:  

    python3.6
    ubuntu16.04 lts
History
Date User Action Args
2018-12-29 02:50:44beruhansetrecipients: + beruhan
2018-12-29 02:50:42beruhansetmessageid: <1546051842.35.0.599440408676.issue35607@roundup.psfhosted.org>
2018-12-29 02:50:42beruhanlinkissue35607 messages
2018-12-29 02:50:42beruhancreate