Issue44155
This issue tracker has been migrated to GitHub,
and is currently read-only.
For more information,
see the GitHub FAQs in the Python's Developer Guide.
Created on 2021-05-17 08:50 by chenzhuowansui, last changed 2022-04-11 14:59 by admin.
Pull Requests | |||
---|---|---|---|
URL | Status | Linked | Edit |
PR 26332 | open | junnplus, 2021-05-24 14:27 |
Messages (2) | |||
---|---|---|---|
msg393796 - (view) | Author: David Chen (chenzhuowansui) | Date: 2021-05-17 08:50 | |
could someone help me out? i spent a lot of time to debug a race condition i have encountered when using BaseManager, Pool within multiprocessing library. here is the simplified code: ``` import sys, time from multiprocessing.managers import BaseManager, SyncManager, BaseProxy from multiprocessing import Process, cpu_count, Pool, Lock, get_context from multiprocessing.queues import Queue, JoinableQueue import queue class QueueManager(BaseManager): pass class Singleton: ''' Decorator class for singleton pattern. ''' def __init__(self, cls): self._cls = cls self._lock = Lock() self._instance = {} def __call__(self, *args, **kwargs): if self._cls not in self._instance: with self._lock: self._instance[self._cls] = self._cls(*args, **kwargs) return self._instance[self._cls] def getInstance(self): return self._instance[self._cls] class LoggingServer(object): def __init__(self, address, pwd): self.logServerAddr = address self.logServerPwd = pwd self.msgQueue = queue.Queue() try: QueueManager.register('getQueue', callable=lambda: self.msgQueue) self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd) self.logServer = self.queueManager.get_server() self.logServer.serve_forever() except: raise RuntimeError("Couldn't start the logging server!") class LoggingProcess(object): def __init__(self, address, pwd): self.logServerAddr = address self.logServerPwd = pwd try: QueueManager.register('getQueue') self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd) self.queueManager.connect() except: raise RuntimeError("Couldn't connect logging process to the logging server!") self.msgQueue = self.queueManager.getQueue() self.process = Process(target=self.loggingProcess, name = "Logging Process", args=(), daemon = True) self.process.start() def terminate(self): self.msgQueue.join() self.process.terminate() def loggingProcess(self): while True: logObj = self.msgQueue.get() print(logObj) @Singleton class Logger(object): def __init__(self, address, pwd): self.logServerAddr = address self.logServerPwd = pwd self.queueManager = None self.msgQueue = None def connectToLogServer(self): try: QueueManager.register('getQueue') self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd) self.queueManager.connect() self.msgQueue = self.queueManager.getQueue() self.ready = True except: raise RuntimeError("Couldn't connect logger to Log Server!") def ReadyCheck(func): def makeDecorator(self, *args, **kwargs): if not self.msgQueue: self.connectToLogServer() func(self, *args, **kwargs) return makeDecorator # Overrided function to log info @ReadyCheck def info(self, info, logfile = sys.stdout): self.msgQueue.put(info) address = ('', 50000) password = b'PASSWORD' log = Logger(address, password) def callback(*args): #print("Finished!!!") pass def job(index): time.sleep(0.1) log.info(str(log.msgQueue) + ":{}".format(index)) log.info("here {}".format(index)) if __name__ == "__main__": # import multiprocessing # logger = multiprocessing.log_to_stderr() # logger.setLevel(multiprocessing.SUBDEBUG) serverProcess = Process(target = LoggingServer, name = "LoggingServerDaemon", args = ((address, password)), daemon = True) serverProcess.start() time.sleep(1) loggingProcess = LoggingProcess(address, password) log.info("Starting...") #pool = Pool(cpu_count()) pool = Pool() #Using a small number of worker(like 10), no problem, but if we increase to a bigger number, say 48 in my case, this program hangs every time... results = [pool.apply_async(job, (i,), callback = callback) for i in range(1)] pool.close() pool.join() log.info("Done") #loggingProcess.terminate() #serverProcess.terminate() ``` LoggerServer class is working as a logging Server(like a proxy), which manages a shared queue. LoggingProcess class is a log consumer class, which fetch the logs from the shared queue(managed by LoggingServer). Logger class is a producer class, which put the logs into the shared queue. As i want to share the global logger in multiple modules in order to unify the logs format/output places/...(something like the logging standard library), so the Logger class is not fully initialized and will be fully initialized later when using it(please see connectToLogServer). and i highly suspect this is root cause of program hang, but i can't go further... the hang sub-process's(ForkPoolWorker) traceback is like the following(using py-spy): ``` Process 3958088: python3 Logger.py Python v3.9.0 (/usr/bin/python3.9) Thread 3958088 (idle): "MainThread" _recv (/usr/lib/python3.9/multiprocessing/connection.py:384) _recv_bytes (/usr/lib/python3.9/multiprocessing/connection.py:419) recv_bytes (/usr/lib/python3.9/multiprocessing/connection.py:221) answer_challenge (/usr/lib/python3.9/multiprocessing/connection.py:757) Client (/usr/lib/python3.9/multiprocessing/connection.py:513) _decref (/usr/lib/python3.9/multiprocessing/managers.py:861) __call__ (/usr/lib/python3.9/multiprocessing/util.py:224) _run_finalizers (/usr/lib/python3.9/multiprocessing/util.py:300) _exit_function (/usr/lib/python3.9/multiprocessing/util.py:334) _bootstrap (/usr/lib/python3.9/multiprocessing/process.py:318) _launch (/usr/lib/python3.9/multiprocessing/popen_fork.py:71) __init__ (/usr/lib/python3.9/multiprocessing/popen_fork.py:19) _Popen (/usr/lib/python3.9/multiprocessing/context.py:277) start (/usr/lib/python3.9/multiprocessing/process.py:121) _repopulate_pool_static (/usr/lib/python3.9/multiprocessing/pool.py:326) _repopulate_pool (/usr/lib/python3.9/multiprocessing/pool.py:303) __init__ (/usr/lib/python3.9/multiprocessing/pool.py:212) Pool (/usr/lib/python3.9/multiprocessing/context.py:119) <module> (/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py:129) ``` it seems the refcount of the shared queue failed to be decref... I googled a lot stuffs, but none seems to be the same with this... so i bring this issue here for help. Any comments and suggestions are highly appreciated! Traceback after CTRL+C: ``` raceback (most recent call last): File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 43, in __init__ self.logServer.serve_forever() File "/usr/lib/python3.9/multiprocessing/managers.py", line 183, in serve_forever sys.exit(0) SystemExit: 0 During handling of the above exception, another exception occurred: Traceback (most recent call last): Traceback (most recent call last): File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 68, in loggingProcess logObj = self.msgQueue.get() File "<string>", line 2, in get File "/usr/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod kind, result = conn.recv() File "/usr/lib/python3.9/multiprocessing/connection.py", line 255, in recv buf = self._recv_bytes() File "/usr/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes buf = self._recv(4) File "/usr/lib/python3.9/multiprocessing/connection.py", line 384, in _recv chunk = read(handle, remaining) KeyboardInterrupt File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 45, in __init__ raise RuntimeError("Couldn't start the logging server!") RuntimeError: Couldn't start the logging server! ``` |
|||
msg394240 - (view) | Author: David Chen (chenzhuowansui) | Date: 2021-05-24 08:11 | |
After some investigation, i almost finally identified the root cause - it was caused by the following line in `Server` class in `manager.py` file: ``` self.listener = Listener(address=address, backlog=16) ``` i'm not sure where the magic number `16` came from, but it turned out it was not big enough for some powerful CPU(48 cores in my case), where the socket server starts refusing to accepting new connections if there are more than `16` unaccepted connections, i think this is why the code stuck at `Client -> answer_challenge` where attempting to create new connection to the server. After i change the number to `32`, this issue is gone. IMO, a magic number here is not very suitable, maybe we could make this number configurable or use `cpu_count` to set it to a better number based on the CPU count. |
History | |||
---|---|---|---|
Date | User | Action | Args |
2022-04-11 14:59:45 | admin | set | github: 88321 |
2021-05-24 14:27:35 | junnplus | set | keywords:
+ patch nosy: + junnplus pull_requests: + pull_request24924 stage: patch review |
2021-05-24 08:11:43 | chenzhuowansui | set | messages: + msg394240 |
2021-05-17 08:50:27 | chenzhuowansui | create |