Message392053
Raymond, thanks for your suggestions.
My deployed applications don't hold up 20m items at a time, that was a way to show the leak.
I was able to resolve the threading, queue-based leaks on my instances by modifying the Queue, Event and Conditions classes to use external doubly linked list library in the following manner (python3.7), not the clearest rewrite, but just to get the idea:
class DllistCondition(threading.Condition):
def __init__(self, lock=None):
if lock is None:
lock = threading.RLock()
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = dllist()
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = all_waiters
#_islize would be empty only if there are no waiters avail, for any n
if len(waiters_to_notify) < 1:
return
node = waiters_to_notify.first
i = 1
while True:
#simulate _islice
if i > n:
return
waiter = node.value
#get next node before release
node_next = node.next
waiter.release()
try:
all_waiters.remove(node)
except ValueError:
pass
i += 1
node = node_next
#if it's the last node, return
if node is None:
return
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = threading._allocate_lock()
waiter.acquire()
node = self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(node)
except ValueError:
pass
class DllistEvent(threading.Event):
def __init__(self):
self._cond = DllistCondition(threading.Lock())
self._flag = False
class DllistQueue(queue.Queue):
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = DllistCondition(self.mutex)
self.not_full = DllistCondition(self.mutex)
self.all_tasks_done = DllistCondition(self.mutex)
self.unfinished_tasks = 0
def _init(self, maxsize):
self.queue = dllist()
Now, I'm not exactly sure that the Queue itself required the `self.queue = deque()`modification, but I'm sure that conditions required getting rid of the stock `self._waiters = deque()` and the consequent use of it.
Memory profiling constantly showed a leak at waiters_to_notify = _deque(_islice(all_waiters, n) in the threading.Condition class, which is both employed by threading.Queue and treading.Event classes.
After the modifiction this leak is gone and I suspect it has to do something with reiniting the deque at `waiters_to_notify = _deque(_islice(all_waiters, n)`
In any cases, I'm leaving this out here in case anyone get to deal with it as well and it might be helpful.
Thanks |
|
Date |
User |
Action |
Args |
2021-04-27 10:21:06 | multiks2200 | set | recipients:
+ multiks2200, rhettinger |
2021-04-27 10:21:06 | multiks2200 | set | messageid: <1619518866.66.0.229370048721.issue43911@roundup.psfhosted.org> |
2021-04-27 10:21:06 | multiks2200 | link | issue43911 messages |
2021-04-27 10:21:06 | multiks2200 | create | |
|