Issue35608
This issue tracker has been migrated to GitHub,
and is currently read-only.
For more information,
see the GitHub FAQs in the Python's Developer Guide.
Created on 2018-12-29 02:53 by beruhan, last changed 2022-04-11 14:59 by admin.
Messages (8) | |||
---|---|---|---|
msg332691 - (view) | Author: beruhan (beruhan) | Date: 2018-12-29 02:53 | |
I used multi-processes to handle cpu intensive task,I have a thread reading data from stdin and put it to a input_queue, a thread get data from output_queue and write it to stdout, multiple processes get data from input queue,then handled the data,and put it to output_queue.But It some times will block forever,I doubt that it was because inappropriate to use the multiprocessing Queue,But I don't know how to solved it,can anyone help me? my code as follows: import multiprocessing import sys import threading import time from multiprocessing import Queue def write_to_stdout(result_queue: Queue): """write queue data to stdout""" while True: data = result_queue.get() if data is StopIteration: break sys.stdout.write(data) sys.stdout.flush() def read_from_stdin(queue): """read data from stdin, put it in queue for process handling""" try: for line in sys.stdin: queue.put(line) finally: queue.put(StopIteration) def process_func(input_queue, result_queue): """get data from input_queue,handled,put result into result_queue""" try: while True: data = input_queue.get() if data is StopIteration: break # cpu intensive task,use time.sleep instead # result = compute_something(data) time.sleep(0.1) result_queue.put(data) finally: # ensure every process end input_queue.put(StopIteration) if __name__ == '__main__': # queue for reading to stdout input_queue = Queue(1000) # queue for writing to stdout result_queue = Queue(1000) # thread reading data from stdin input_thread = threading.Thread(target=read_from_stdin, args=(input_queue,)) input_thread.start() # thread reading data from stdin output_thread = threading.Thread(target=write_to_stdout, args=(result_queue,)) output_thread.start() processes = [] cpu_count = multiprocessing.cpu_count() # start multi-process to handle some cpu intensive task for i in range(cpu_count): proc = multiprocessing.Process(target=process_func, args=(input_queue, result_queue)) proc.start() processes.append(proc) # joined input thread input_thread.join() # joined all task processes for proc in processes: proc.join() # ensure output thread end result_queue.put(StopIteration) # joined output thread output_thread.join() test environment: python3.6.5 ubuntu16.04 |
|||
msg332694 - (view) | Author: Emmanuel Arias (eamanu) * | Date: 2018-12-29 03:07 | |
Hi > def write_to_stdout(result_queue: Queue): I think that you have to write here a sleep. IMO this is blocking all. |
|||
msg332695 - (view) | Author: Emmanuel Arias (eamanu) * | Date: 2018-12-29 03:10 | |
> data = result_queue.get() And this is blocked |
|||
msg332706 - (view) | Author: beruhan (beruhan) | Date: 2018-12-29 08:07 | |
debug message as follows: [DEBUG/MainProcess] created semlock with handle 140059486064640 [DEBUG/MainProcess] created semlock with handle 140059486060544 [DEBUG/MainProcess] created semlock with handle 140059486056448 [DEBUG/MainProcess] Queue._after_fork() [DEBUG/MainProcess] created semlock with handle 140059486052352 [DEBUG/MainProcess] created semlock with handle 140059486048256 [DEBUG/MainProcess] created semlock with handle 140059486044160 [DEBUG/MainProcess] Queue._after_fork() [DEBUG/MainProcess] Queue._start_thread() [DEBUG/MainProcess] doing self._thread.start() [DEBUG/MainProcess] starting thread to feed data to pipe [DEBUG/MainProcess] ... done self._thread.start() ^CTraceback (most recent call last): File "main_simple.py", line 76, in <module> proc.join() File "/usr/lib/python3.6/multiprocessing/process.py", line 124, in join res = self._popen.wait(timeout) File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 50, in wait return self.poll(os.WNOHANG if timeout == 0.0 else 0) File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll pid, sts = os.waitpid(self.pid, flag) KeyboardInterrupt ^CException ignored in: <module 'threading' from '/usr/lib/python3.6/threading.py'> Traceback (most recent call last): File "/usr/lib/python3.6/threading.py", line 1294, in _shutdown t.join() File "/usr/lib/python3.6/threading.py", line 1056, in join self._wait_for_tstate_lock() File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock elif lock.acquire(block, timeout): KeyboardInterrupt [INFO/MainProcess] process shutting down [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 [DEBUG/MainProcess] telling queue thread to quit [INFO/MainProcess] calling join() for process Process-3 ^CError in atexit._run_exitfuncs: Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll pid, sts = os.waitpid(self.pid, flag) |
|||
msg332710 - (view) | Author: Antoine Pitrou (pitrou) * | Date: 2018-12-29 09:53 | |
Your input_thread puts StopIteration once input the queue. But there are several worker processes popping from that queue, and only one of them will see the StopIteration. So I'm not surprised other worker processes would be stuck waiting in their loop. |
|||
msg332723 - (view) | Author: Tim Peters (tim.peters) * | Date: 2018-12-29 16:00 | |
Antoine, alas, it's subtler than that. The worker process (process_func()) puts _another_ `StopIteration` on the input queue in its `finally` clause. So the first worker process to finish adds back a sentinel for the next worker to see, and so on. At the end, one StopIteration is left on input_queue (added by the last worker to finish). Everything shuts down cleanly for me on 64-bit Win10 Py 3.7.2 no matter what input I tried, so I can't reproduce. The OP really needs to identify _where_ it's hanging, and when (after all input has been read? somewhere "random in the middle"? ...), and if at all possible supply an input on which it does hang. The OP should also try 3.7.2. |
|||
msg332850 - (view) | Author: beruhan (beruhan) | Date: 2019-01-02 01:22 | |
I also tested on windows 10,it worked normally.But when I run it under ubuntu16.04,It will blocked.my python version is 3.6.5 |
|||
msg379101 - (view) | Author: ruozeng.w (dukewrz) | Date: 2020-10-20 08:12 | |
I ran into the same issue. I'm using Ansible to deliver thousands of remote tasks. One TaskQueueManager starts multiple worker processes, each worker process executes a remote task and send task result data to TaskQueueManager through MultiProcessing.Queue, so there are 1 consumer and thousands of producers(every producer should exit after sending task result). In high concurrency scenarios, this MAY happen and many worker processes will never exit。 Example: sending ansible ping task (which executes very fast and returns very short result) to 20000 targets. Environment ============================== python 3.6.5 oel 7.4 gdb debug info of worker process ============================== (gdb) py-list 1067 # If the lock is acquired, the C code is done, and self._stop() is 1068 # called. That sets ._is_stopped to True, and ._tstate_lock to None. 1069 lock = self._tstate_lock 1070 if lock is None: # already determined that the C code is done 1071 assert self._is_stopped >1072 elif lock.acquire(block, timeout): 1073 lock.release() 1074 self._stop() 1075 1076 @property 1077 def name(self): (gdb) py-bt Traceback (most recent call first): <built-in method acquire of _thread.lock object at remote 0x7f94b2190918> File "/usr/local/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock elif lock.acquire(block, timeout): File "/usr/local/lib/python3.6/threading.py", line 1056, in join self._wait_for_tstate_lock() File "/usr/local/lib/python3.6/multiprocessing/queues.py", line 191, in _finalize_join thread.join() File "/usr/local/lib/python3.6/multiprocessing/util.py", line 186, in __call__ res = self._callback(*self._args, **self._kwargs) File "/usr/local/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers finalizer() File "/usr/local/lib/python3.6/multiprocessing/util.py", line 322, in _exit_function _run_finalizers() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 261, in _bootstrap util._exit_function() File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch code = process_obj._bootstrap() File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen return Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start self._popen = self._Popen(self) File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/plugins/strategy/__init__.py", line 328, in _queue_task worker_prc.start() File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/plugins/strategy/telinear.py", line 301, in run self._queue_task(host, task, task_vars, play_context) File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/executor/task_queue_manager.py", line 308, in run play_return = strategy.run(iterator, play_context) Anyone can help? Thx! |
History | |||
---|---|---|---|
Date | User | Action | Args |
2022-04-11 14:59:09 | admin | set | github: 79789 |
2020-10-20 08:13:00 | dukewrz | set | nosy:
+ dukewrz messages: + msg379101 |
2019-01-02 01:22:00 | beruhan | set | messages: + msg332850 |
2018-12-29 16:00:39 | tim.peters | set | nosy:
+ tim.peters messages: + msg332723 |
2018-12-29 09:53:58 | pitrou | set | messages: + msg332710 |
2018-12-29 08:07:06 | beruhan | set | messages: + msg332706 |
2018-12-29 03:10:03 | eamanu | set | messages: + msg332695 |
2018-12-29 03:07:46 | eamanu | set | nosy:
+ eamanu messages: + msg332694 |
2018-12-29 02:54:13 | beruhan | set | type: behavior |
2018-12-29 02:53:42 | beruhan | create |