This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author multiks2200
Recipients multiks2200, rhettinger
Date 2021-04-27.10:21:06
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1619518866.66.0.229370048721.issue43911@roundup.psfhosted.org>
In-reply-to
Content
Raymond, thanks for your suggestions.

My deployed applications don't hold up 20m items at a time, that was a way to show the leak. 

I was able to resolve the threading, queue-based leaks on my instances by modifying the Queue, Event and Conditions classes to use external doubly linked list library in the following manner (python3.7), not the clearest rewrite, but just to get the idea:

    class DllistCondition(threading.Condition):
        def __init__(self, lock=None):
            if lock is None:
                lock = threading.RLock()
            self._lock = lock
            self.acquire = lock.acquire
            self.release = lock.release
            try:
                self._release_save = lock._release_save
            except AttributeError:
                pass
            try:
                self._acquire_restore = lock._acquire_restore
            except AttributeError:
                pass
            try:
                self._is_owned = lock._is_owned
            except AttributeError:
                pass
            self._waiters = dllist()

        def notify(self, n=1):
            if not self._is_owned():
                raise RuntimeError("cannot notify on un-acquired lock")
            all_waiters = self._waiters
            waiters_to_notify = all_waiters
            #_islize would be empty only if there are no waiters avail, for any n
            if len(waiters_to_notify) < 1:
                return

            node = waiters_to_notify.first
            i = 1
            while True:
                #simulate _islice
                if i > n:
                    return

                waiter = node.value
                #get next node before release
                node_next = node.next
                waiter.release()
                try:
                    all_waiters.remove(node)
                except ValueError:
                    pass

                i += 1
                node = node_next
                #if it's the last node, return
                if node is None:
                    return




        def wait(self, timeout=None):
            if not self._is_owned():
                raise RuntimeError("cannot wait on un-acquired lock")
            waiter = threading._allocate_lock()
            waiter.acquire()
            node = self._waiters.append(waiter)
            saved_state = self._release_save()
            gotit = False
            try:    # restore state no matter what (e.g., KeyboardInterrupt)
                if timeout is None:
                    waiter.acquire()
                    gotit = True
                else:
                    if timeout > 0:
                        gotit = waiter.acquire(True, timeout)
                    else:
                        gotit = waiter.acquire(False)
                return gotit
            finally:
                self._acquire_restore(saved_state)
                if not gotit:
                    try:
                        self._waiters.remove(node)
                    except ValueError:
                        pass

    class DllistEvent(threading.Event):
        def __init__(self):
            self._cond = DllistCondition(threading.Lock())
            self._flag = False

    class DllistQueue(queue.Queue):

        def __init__(self, maxsize=0):
            self.maxsize = maxsize
            self._init(maxsize)

            self.mutex = threading.Lock()

            self.not_empty = DllistCondition(self.mutex)

            self.not_full = DllistCondition(self.mutex)

            self.all_tasks_done = DllistCondition(self.mutex)
            self.unfinished_tasks = 0

        def _init(self, maxsize):
            self.queue = dllist()

Now, I'm not exactly sure that the Queue itself required the `self.queue = deque()`modification, but I'm sure that conditions required getting rid of the stock `self._waiters = deque()` and the consequent use of it. 

Memory profiling constantly showed a leak at waiters_to_notify = _deque(_islice(all_waiters, n) in the threading.Condition class, which is both employed by threading.Queue and treading.Event classes. 

After the modifiction this leak is gone and I suspect it has to do something with reiniting the deque at `waiters_to_notify = _deque(_islice(all_waiters, n)`

In any cases, I'm leaving this out here in case anyone get to deal with it as well and it might be helpful. 

Thanks
History
Date User Action Args
2021-04-27 10:21:06multiks2200setrecipients: + multiks2200, rhettinger
2021-04-27 10:21:06multiks2200setmessageid: <1619518866.66.0.229370048721.issue43911@roundup.psfhosted.org>
2021-04-27 10:21:06multiks2200linkissue43911 messages
2021-04-27 10:21:06multiks2200create