This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: python3 multiprocessing queue deadlock when use thread and process at same time
Type: behavior Stage:
Components: Library (Lib) Versions: Python 3.6
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: beruhan, davin, dukewrz, eamanu, pitrou, tim.peters
Priority: normal Keywords:

Created on 2018-12-29 02:53 by beruhan, last changed 2022-04-11 14:59 by admin.

Messages (8)
msg332691 - (view) Author: beruhan (beruhan) Date: 2018-12-29 02:53
I used multi-processes to handle cpu intensive task,I have a thread reading data from stdin and put it to a input_queue,  a thread get data from output_queue and write it to stdout, multiple processes get data from input queue,then handled the data,and put it to output_queue.But It some times will block forever,I doubt that it was because inappropriate to use the multiprocessing Queue,But I don't know how to solved it,can anyone help me?
my code as follows:

    import multiprocessing
    import sys
    import threading
    import time
    from multiprocessing import Queue


    def write_to_stdout(result_queue: Queue):
        """write queue data to stdout"""
        while True:
            data = result_queue.get()
            if data is StopIteration:
                break
            sys.stdout.write(data)
            sys.stdout.flush()
    
    
    def read_from_stdin(queue):
        """read data from stdin, put it in queue for process handling"""
        try:
            for line in sys.stdin:
                queue.put(line)
        finally:
            queue.put(StopIteration)
    
    
    def process_func(input_queue, result_queue):
        """get data from input_queue,handled,put result into result_queue"""
        try:
            while True:
                data = input_queue.get()
                if data is StopIteration:
                    break
                # cpu intensive task,use time.sleep instead
                # result = compute_something(data)
                time.sleep(0.1)
                result_queue.put(data)
        finally:
            # ensure every process end
            input_queue.put(StopIteration)
    
    
    if __name__ == '__main__':
        # queue for reading to stdout
        input_queue = Queue(1000)
    
        # queue for writing to stdout
        result_queue = Queue(1000)
    
        # thread reading data from stdin
        input_thread = threading.Thread(target=read_from_stdin, args=(input_queue,))
        input_thread.start()
    
        # thread reading data from stdin
        output_thread = threading.Thread(target=write_to_stdout, args=(result_queue,))
        output_thread.start()
    
        processes = []
        cpu_count = multiprocessing.cpu_count()
        # start multi-process to handle some cpu intensive task
        for i in range(cpu_count):
            proc = multiprocessing.Process(target=process_func, args=(input_queue, result_queue))
            proc.start()
            processes.append(proc)
    
        # joined input thread
        input_thread.join()
    
        # joined all task processes
        for proc in processes:
            proc.join()
    
        # ensure output thread end
        result_queue.put(StopIteration)
    
        # joined output thread
        output_thread.join()

test environment:  

    python3.6.5
    ubuntu16.04
msg332694 - (view) Author: Emmanuel Arias (eamanu) * Date: 2018-12-29 03:07
Hi

>    def write_to_stdout(result_queue: Queue):

I think that you have to write here a sleep. IMO this is blocking all.
msg332695 - (view) Author: Emmanuel Arias (eamanu) * Date: 2018-12-29 03:10
> data = result_queue.get()

And this is blocked
msg332706 - (view) Author: beruhan (beruhan) Date: 2018-12-29 08:07
debug message as follows:
[DEBUG/MainProcess] created semlock with handle 140059486064640
[DEBUG/MainProcess] created semlock with handle 140059486060544
[DEBUG/MainProcess] created semlock with handle 140059486056448
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] created semlock with handle 140059486052352
[DEBUG/MainProcess] created semlock with handle 140059486048256
[DEBUG/MainProcess] created semlock with handle 140059486044160
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/MainProcess] doing self._thread.start()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
^CTraceback (most recent call last):
  File "main_simple.py", line 76, in <module>
    proc.join()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 124, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 50, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
^CException ignored in: <module 'threading' from '/usr/lib/python3.6/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 1294, in _shutdown
    t.join()
  File "/usr/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] telling queue thread to quit
[INFO/MainProcess] calling join() for process Process-3
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll
    pid, sts = os.waitpid(self.pid, flag)
msg332710 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2018-12-29 09:53
Your input_thread puts StopIteration once input the queue.  But there are several worker processes popping from that queue, and only one of them will see the StopIteration.  So I'm not surprised other worker processes would be stuck waiting in their loop.
msg332723 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2018-12-29 16:00
Antoine, alas, it's subtler than that.  The worker process (process_func()) puts _another_ `StopIteration` on the input queue in its `finally` clause.  So the first worker process to finish adds back a sentinel for the next worker to see, and so on.  At the end, one StopIteration is left on input_queue (added by the last worker to finish).

Everything shuts down cleanly for me on 64-bit Win10 Py 3.7.2 no matter what input I tried, so I can't reproduce.

The OP really needs to identify _where_ it's hanging, and when (after all input has been read?  somewhere "random in the middle"? ...), and if at all possible supply an input on which it does hang.

The OP should also try 3.7.2.
msg332850 - (view) Author: beruhan (beruhan) Date: 2019-01-02 01:22
I also tested on windows 10,it worked normally.But when I run it under ubuntu16.04,It will blocked.my python version is 3.6.5
msg379101 - (view) Author: ruozeng.w (dukewrz) Date: 2020-10-20 08:12
I ran into the same issue.
I'm using Ansible to deliver thousands of  remote tasks. One TaskQueueManager starts multiple worker processes, each worker process executes a remote task and send task result data to TaskQueueManager through MultiProcessing.Queue, so there are 1 consumer and thousands of producers(every producer should exit after sending task result). In high concurrency scenarios, this MAY happen and many worker processes will never exit。

Example:
sending ansible ping task (which executes very fast and returns very short result) to 20000 targets.

Environment
==============================
python 3.6.5
oel 7.4

gdb debug info of worker process
==============================
(gdb) py-list
1067            # If the lock is acquired, the C code is done, and self._stop() is
1068            # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
1069            lock = self._tstate_lock
1070            if lock is None:  # already determined that the C code is done
1071                assert self._is_stopped
>1072            elif lock.acquire(block, timeout):
1073                lock.release()
1074                self._stop()
1075    
1076        @property
1077        def name(self):
(gdb) py-bt
Traceback (most recent call first):
  <built-in method acquire of _thread.lock object at remote 0x7f94b2190918>
  File "/usr/local/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
  File "/usr/local/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/local/lib/python3.6/multiprocessing/queues.py", line 191, in _finalize_join
    thread.join()
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers
    finalizer()
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 322, in _exit_function
    _run_finalizers()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 261, in _bootstrap
    util._exit_function()
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch
    code = process_obj._bootstrap()
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/plugins/strategy/__init__.py", line 328, in _queue_task
    worker_prc.start()
  File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/plugins/strategy/telinear.py", line 301, in run
    self._queue_task(host, task, task_vars, play_context)
  File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/executor/task_queue_manager.py", line 308, in run
    play_return = strategy.run(iterator, play_context)



Anyone can help? Thx!
History
Date User Action Args
2022-04-11 14:59:09adminsetgithub: 79789
2020-10-20 08:13:00dukewrzsetnosy: + dukewrz
messages: + msg379101
2019-01-02 01:22:00beruhansetmessages: + msg332850
2018-12-29 16:00:39tim.peterssetnosy: + tim.peters
messages: + msg332723
2018-12-29 09:53:58pitrousetmessages: + msg332710
2018-12-29 08:07:06beruhansetmessages: + msg332706
2018-12-29 03:10:03eamanusetmessages: + msg332695
2018-12-29 03:07:46eamanusetnosy: + eamanu
messages: + msg332694
2018-12-29 02:54:13beruhansettype: behavior
2018-12-29 02:53:42beruhancreate