This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author dukewrz
Recipients beruhan, davin, dukewrz, eamanu, pitrou, tim.peters
Date 2020-10-20.08:12:58
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1603181580.45.0.00927708090047.issue35608@roundup.psfhosted.org>
In-reply-to
Content
I ran into the same issue.
I'm using Ansible to deliver thousands of  remote tasks. One TaskQueueManager starts multiple worker processes, each worker process executes a remote task and send task result data to TaskQueueManager through MultiProcessing.Queue, so there are 1 consumer and thousands of producers(every producer should exit after sending task result). In high concurrency scenarios, this MAY happen and many worker processes will never exit。

Example:
sending ansible ping task (which executes very fast and returns very short result) to 20000 targets.

Environment
==============================
python 3.6.5
oel 7.4

gdb debug info of worker process
==============================
(gdb) py-list
1067            # If the lock is acquired, the C code is done, and self._stop() is
1068            # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
1069            lock = self._tstate_lock
1070            if lock is None:  # already determined that the C code is done
1071                assert self._is_stopped
>1072            elif lock.acquire(block, timeout):
1073                lock.release()
1074                self._stop()
1075    
1076        @property
1077        def name(self):
(gdb) py-bt
Traceback (most recent call first):
  <built-in method acquire of _thread.lock object at remote 0x7f94b2190918>
  File "/usr/local/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
  File "/usr/local/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/local/lib/python3.6/multiprocessing/queues.py", line 191, in _finalize_join
    thread.join()
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers
    finalizer()
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 322, in _exit_function
    _run_finalizers()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 261, in _bootstrap
    util._exit_function()
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch
    code = process_obj._bootstrap()
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/plugins/strategy/__init__.py", line 328, in _queue_task
    worker_prc.start()
  File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/plugins/strategy/telinear.py", line 301, in run
    self._queue_task(host, task, task_vars, play_context)
  File "/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/executor/task_queue_manager.py", line 308, in run
    play_return = strategy.run(iterator, play_context)



Anyone can help? Thx!
History
Date User Action Args
2020-10-20 08:13:00dukewrzsetrecipients: + dukewrz, tim.peters, pitrou, davin, eamanu, beruhan
2020-10-20 08:13:00dukewrzsetmessageid: <1603181580.45.0.00927708090047.issue35608@roundup.psfhosted.org>
2020-10-20 08:13:00dukewrzlinkissue35608 messages
2020-10-20 08:12:58dukewrzcreate