This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Queue.get() memory leak
Type: Stage: resolved
Components: Versions: Python 3.9
process
Status: closed Resolution: not a bug
Dependencies: Superseder:
Assigned To: rhettinger Nosy List: multiks2200, rhettinger
Priority: normal Keywords: patch

Created on 2021-04-22 09:13 by multiks2200, last changed 2022-04-11 14:59 by admin. This issue is now closed.

Files
File name Uploaded Description Edit
instrument_deque.diff rhettinger, 2021-04-23 16:15 Instrument count of mallocs and frees
Messages (17)
msg391586 - (view) Author: Jens (multiks2200) Date: 2021-04-22 09:13
I use the following code to produce what looks like a memory leak after emptying a queue with the get() method.


    import queue
    import os
    import psutil
    
    def run(del_after_puts, del_after_gets, n_puts, process):
    
            mem = queue.Queue()
        
            for msg in range(n_puts):
                msg_put = f'{msg}_0000000000000000000000000000000000000000000000000000000000000333333333333331111111111'
                if msg % 1000000 == 0:
                    print(f'puting  {msg} qsize {len(mem.queue)}')
                mem.put(msg_put)
        
        
            print(f'------ put done  ----- qsize {len(mem.queue)}')
            print(f'mem_pct {round(process.memory_percent(), 2)}% ')
        
            if del_after_puts:
                print(f'deleting queue after puts {mem}')
                del mem
                print(f'mem_pct {round(process.memory_percent(), 2)}% ')
                return
        
            for _ in range(n_puts):
                msg_get = mem.get()
                msg = int(msg_get.split('_')[0])
                if msg % 1000000 == 0:
                    print(f'getting_q {msg} qsize {len(mem.queue)} ')
                    mem.task_done()
                
            print(f'------ gets done  ----- qsize {len(mem.queue)}')
            print(f'mem_pct {round(process.memory_percent(), 2)}% ')
        
            if del_after_gets:
                print(f'deleting queue after gets {mem}')
                del mem
                print(f'mem_pct {round(process.memory_percent(), 2)}% ')
                return
        
        if __name__ == '__main__':
            del_after_puts = False
            del_after_gets = False
            n_puts = 20_000_000
            print()
            print('#########')
            print(f'del_after_puts {del_after_puts} del_after_gets {del_after_gets} n_puts {n_puts}')
        
            process = psutil.Process(os.getpid())
            print('before run')
            print(f'mem_pct {round(process.memory_percent(), 2)}% ')
        
            run(del_after_puts, del_after_gets, n_puts, process)
        
            print(f'after run return')
            print(f'mem_pct {round(process.memory_percent(), 2)}% ')

This script can be run in 3 ways:
1. Add n_puts elements into the queue and then empty it.
2. Add n_puts elements into the queue and then delete the queue object
3. Add n_puts elements into the queue and then empty it and then delete the queue object.

For the 1st and 3rd case, the script seems to produce a memory leak as in the following:

1st case, before putting elements into the queue mem used is 0.15%, after emptying it  2.22%:

> #########
> 
> del_after_puts False del_after_gets False n_puts 20000000
> 
> before run
> 
> mem_pct 0.15% 
> 
> ------ put done  ----- qsize 20000000
> 
> mem_pct 37.61% 
> 
> ------ gets done  ----- qsize 0
> 
> mem_pct 2.22%

3rd case, before putting elements into the queue mem used is 0.15%, after emptying it  2.22%, after deleting the object, 2.22%:

> #########
> 
> del_after_puts False del_after_gets True n_puts 20000000
> 
> before run
> 
> mem_pct 0.15% 
> 
> ------ put done  ----- qsize 20000000
> 
> mem_pct 37.61% 
> 
> ------ gets done  ----- qsize 0
> 
> mem_pct 2.22%
> 
> deleting queue after gets <queue.Queue object at 0x7fbd87295a10>
> 
> mem_pct 2.22%

For the 2nd case, mem_pct at the start is 0.15%, after putting all elements into the queue and just deleting it, 0.16%, which is almost the same.

> #########
> 
> del_after_puts True del_after_gets False n_puts 20000000
> 
> before run
> 
> mem_pct 0.15% 
> 
> ------ put done  ----- qsize 20000000
> 
> mem_pct 37.61% 
> 
> deleting queue after puts <queue.Queue object at 0x7f29084eca10>
> 
> mem_pct 0.16%

As it can be seen, memory returns to the start level only in the first case when only `queue.put()` is invoked, hence it seems that `queue.get()` produces a memory leak. 

This is persistent across python 3.7, 3.8, as well as 3.9.
msg391587 - (view) Author: Jens (multiks2200) Date: 2021-04-22 09:14
I've tried to profile the memory with tracemalloc as well as pympler, but they dont show any leaks at python level, so I suspect that might be a C level leak.
msg391614 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2021-04-22 16:53
This may be a memory fragmentation problem and likely doesn't have anything to do with Queue instances.

As an experiment, try using queue.PriorityQueue and queue.LifoQueue to see if the issue persists.
msg391655 - (view) Author: Jens (multiks2200) Date: 2021-04-23 06:03
Hi,

Thanks for your reply, so I've run same script with queue.PriorityQueue, queue.LifoQueue, queue.SimpleQueue, Asyncio.Queue as well as collections.dequeue

1. PriorityQueue
>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>------ put done  ----- qsize 20000000
>mem_pct 37.49% 
>------ gets done  ----- qsize 0
>mem_pct 0.16% 
>deleting queue after gets <queue.PriorityQueue object at 0x7f097cfeb290>
>mem_pct 0.16% 

2. LifoQueue
>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>------ put done  ----- qsize 20000000
>mem_pct 37.49% 
>------ gets done  ----- qsize 0
>mem_pct 0.16% 
>deleting queue after gets <queue.LifoQueue object at 0x7fd1150c1250>
>mem_pct 0.16% 

3. SimpleQueue
>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>------ put done  ----- qsize 20000000
>mem_pct 37.49% 
>------ gets done  ----- qsize 0
>mem_pct 0.28% 
>deleting queue after gets <_queue.SimpleQueue object at 0x7f1da93d03b0>
>mem_pct 0.28% 

4. asyncio.Queue
>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.23% 
>------ put done  ----- qsize 20000000
>mem_pct 37.69% 
>------ gets done  ----- qsize 0
>mem_pct 2.3% 
>deleting queue after gets <Queue maxsize=0>
>mem_pct 0.25% 

5. collections.deque
>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>------ put done  ----- qsize 20000000
>mem_pct 37.61% 
>------ gets done  ----- qsize 0
>mem_pct 2.22% 
>deleting queue after gets deque([])
>mem_pct 0.16% 

So from the result it can be seen, that PriorityQueue and LifoQueue dont leak at all, and return to original memory after queues are emptied, so deleting the object reference has no effect.

SimpleQueue seems to leak in the same way as Queue, but to a way lesser extent and deleting object does not return the memory.

asyncio.Queue and collections.deque seem to leak to the same extent as Queue, but memory can be returned by deleting the object.

As result, this got be to just trying to use Queue._put() and Queue._get() to populate the deque inside the Queue object and bypass all conditions/locks logic, which produces the following result:
>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>------ put done  ----- qsize 20000000
>mem_pct 37.61% 
>------ gets done  ----- qsize 0
>mem_pct 2.22% 
>deleting queue after gets <queue.Queue object at 0x7f6a08e152d0>
>mem_pct 0.16%

It can be seen that for a Queue where only _put() and _get() methods are used, the memory still leaks, but can be released by deleting the object reference.

Perhaps that indicates a possible leak in the Condition class? I run a Queue with threads on my production applications that run for weeks straight, and the Queue leaks, and memory profiler always shows a leak at waiters_to_notify = _deque(_islice(all_waiters, n)) in Condition.notify()
I dont want to make it more confusing by bringing it up here since it might be not the same kind of leak, because I dont simply populate up to 20million objects and then pop them out of the Queue in my application, but from the memory profiling results I see here this seems to be related.

Thanks
msg391656 - (view) Author: Jens (multiks2200) Date: 2021-04-23 06:16
Just inspected the PriorityQueue and LifoQueue classes, they dont employ a  deque at all but simply a list, but all other Queues tested do (except the native SimpleQueue). Since they don't leak, the leak itself seems to be coming from deque, and the fact that it does not get released after deletion only in a single case when Conditions are involved, makes me think the combination of two produces this unreleasable leak.
msg391659 - (view) Author: Jens (multiks2200) Date: 2021-04-23 06:29
Results for queue._PySimpleQueue:

>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>------ put done  ----- qsize 20000000
>mem_pct 37.61% 
>------ gets done  ----- qsize 0
>mem_pct 2.22% 
>deleting queue after gets <queue._PySimpleQueue object at 0x7fab760642d0>
>mem_pct 2.22%

The leak occurs and not getting released after deletion.
msg391663 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2021-04-23 07:36
For a large amount of data, a list uses a single large contiguous block of memory while a deque uses many small discontiguous blocks.  In your demo, I that suspect that some of the memory pages for deque's blocks are also being used for other small bits of data.  If any of the those small bits survive (either in active use or held for future use by the small memory allocator), then the page cannot be reclaimed.  When memory fragments like this, it manifests as an increasing amount of process memory.

Also, the interaction between the C library allocation functions and the O/S isn't under our control.  Even when our code correctly calls PyMem_Free(), it isn't assured that total process memory goes goes back down.

As an experiment, try to recreate the effect by building a list of lists:

    class Queue(list):
        def put(self, x):
            if not self or len(self[-1]) >= 66:
                self.append([])
            self[-1].append(x)
        def get(self):
            if not self:
                raise IndexError
            block = self[0]
            x = block.pop(0)
            if not block:
                self.pop(0)
            return x
msg391665 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2021-04-23 07:57
Also, when running deque measurements, run the following before getting the starting process memory:

   deque(range(2000)).clear()   # fill the deque freelist

That will give a cleaner before/after comparison.
msg391691 - (view) Author: Jens (multiks2200) Date: 2021-04-23 12:24
Thanks for your input.

So i've run the tests with the List of Lists Queue class, there seems to be a resulting difference depending on what qsize() method I define, that is called my script.

For an instance where qsize just return None,

	class QueueLists(list):

	    def put(self, x):
		if not self or len(self[-1]) >= 66:
		    self.append([])
		self[-1].append(x)

	    def get(self):
		if not self:
		    raise IndexError
		block = self[0]
		x = block.pop(0)
		if not block:
		    self.pop(0)
		return x

	    # def qsize(self):
	    #     tot = 0
	    #     for elem in self:
	    #         tot += len(elem)
	    #     return tot

	    def qsize(self):
		return None

The results are:
>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15%
>------ put done  ----- qsize None
>mem_pct 38.06% 
>------ gets done  ----- qsize None
>mem_pct 2.35% 
>deleting queue after gets []
>mem_pct 2.35% 
>time elapsed 0:01:04.703969

For a Queue instance, where qsize() returns the actual size

	class QueueLists(list):

	    def put(self, x):
		if not self or len(self[-1]) >= 66:
		    self.append([])
		self[-1].append(x)

	    def get(self):
		if not self:
		    raise IndexError
		block = self[0]
		x = block.pop(0)
		if not block:
		    self.pop(0)
		return x

	    def qsize(self):
		tot = 0
		for elem in self:
		    tot += len(elem)
		return tot
the results are:

>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>------ put done  ----- qsize 20000000
>mem_pct 38.05% 
>------ gets done  ----- qsize 0
>mem_pct 2.35% 
>deleting queue after gets []
>mem_pct 0.18% 
>time elapsed 0:00:53.347746

So both instances leak as you've indicated, but the one that returns None as queue size does not get it's leak released after the instance is deleted which is a weird difference.
msg391692 - (view) Author: Jens (multiks2200) Date: 2021-04-23 12:27
Regarding deque, the leak indeed does not seem to be releasable after it is inited to up the size of the number of elements that are going to put into the queue, as:


    qmem = collections.deque(range(n_puts))
    qmem.clear()

The results are:

>#########
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.14% 
>------ put done  ----- qsize 20000000
>mem_pct 37.61% 
>------ gets done  ----- qsize 0
>mem_pct 2.22% 
>deleting queue after gets deque([])
>mem_pct 2.22% 
>time elapsed 0:00:16.800156
msg391693 - (view) Author: Jens (multiks2200) Date: 2021-04-23 12:36
So this got me thinking of trying to use some other linked list implementations. 

I've used a llist library - https://github.com/ajakubek/python-llist

Using their doubly linked list implementation:

    class DllistQueue(queue.Queue):
        def _init(self, maxsize):
            self.queue = dllist()

Results are:
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>------ put done  ----- qsize 20000000
>mem_pct 55.34% 
>------ gets done  ----- qsize 0
>mem_pct 0.15% 
>deleting queue after gets <__main__.DllistQueue object at 0x7f494ba91450>
>mem_pct 0.15% 
>time elapsed 0:02:17.642700

Using their singly listed list implementation:

    class SllistQueue(queue.Queue):
        def _init(self, maxsize):
            self.queue = sllist()

results are:
>del_after_puts False del_after_gets True n_puts 20000000
>before run
>mem_pct 0.15% 
>puting  0 qsize 0
>------ put done  ----- qsize 20000000
>mem_pct 55.34% 
>------ gets done  ----- qsize 0
>mem_pct 0.15% 
>deleting queue after gets <__main__.SllistQueue object at 0x7ff07bf484d0>
>mem_pct 0.15% 
>time elapsed 0:02:03.495047

I have not dived in how their C implementations differ from deque, but it seems to use more memory and it's slower, but it does not seem to leak at all. 

Thanks
msg391696 - (view) Author: Jens (multiks2200) Date: 2021-04-23 13:26
Also compared this library to deque, and Queues based on this:

https://github.com/kata198/python-cllist

It seems to be as fast as deque, uses a bit more memory at the top usage, but does not leak at all.
msg391704 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2021-04-23 16:15
FWIW, when running your code on my Intel Mac with 16 Gb of RAM, the "after return" result is 0.1%.  So getting up to 2.2% seems to be specific to your build.

Also, I just ran your code with a deque instrumented to count the number of mallocs and frees.  At the end of the run, there were 312506 blocks allocated and 312490 freed.  The difference of 16 is expected.  That is the number of blocks kept in the freelist to reduce allocator calls for common use cases.

So we're not seeing a "leak" in the traditional sense of more calls to malloc() than to free().  Instead, there is just an indirect measurement of total process memory which is something we can't directly control.  

That you observed similar a result with a list of lists corroborates that there isn't an issue specific to Queue or deques.  Instead, it is just an artifact of this particular pattern of exercising the memory allocators.  

I conclude that what you're seeing isn't a bug.  The effect is due to a mix of:
1) the interaction between the C library and the O/S memory management routines
2) normally occurring fragmentation
3) freelists which are intentionally reserved.

Thank you for the report, but we've spent enough time on it.  Trying to account for total process memory is like chasing rainbows, mostly elusive and there is not pot of gold at the end :-)
msg391710 - (view) Author: Jens (multiks2200) Date: 2021-04-23 17:07
Ok, I see, thanks Raymond. 

Queue based logging leaks seem to hang my deployed applications atm, so this seemed like a possible reason for it. I use locally 8GB Ubuntu, and it gets to 2.2% after return with 20million puts, and on a remote 1GB Ubuntu instance, it gets to 3.79% up from 1.24% with 3 million puts after return. So I don't think this is necessarily specific to my local build and seems like a genuine issue.

I understand where are you coming from stating that it's not a bug, but the alternative deque implementations don't seem to suffer from the same issue, at least to such a big extent, as I have shown.

I will continue and try to adapt alternate deque implementations and see if it solves Queues leaking and hanging my instances.

Thanks for your time and involvement in this.
msg391724 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2021-04-23 18:30
> I don't think this is necessarily specific to my local build

Replace "local" with "specific to a particular combination of C compiler and operating system".  On my Mac, the effect mostly doesn't occur as all, 0.05% before the run and 0.10% after the run.   This shows that Python source isn't at fault.

Also, since the same effect arises with a list of lists, we know that this isn't deque specific.

It isn't even Python specific.  For your OS and C compiler, it would happen to any C program that made the same pattern of calls to malloc() and free().  Memory allocators vary greatly in quality and in their response to particular load patterns.  Fragmentation is a perpetual nuisance.


>  the alternative deque implementations don't seem 
> to suffer from the same issue

This is unsurprising.  Different patterns of memory access produce different random fragmentation artifacts.


> I will continue and try to adapt alternate deque 
> implementations and see if it solves Queues leaking 

For your own project, consider:

1) Use a PriorityQueue adding elements with a counter or timestamp:
      
      mem.put((time(), msg))
      ...
      _, msg = mem.get()

2) Build against a different memory allocator such dlmalloc.

3)  Subclass Queue with some alternate structure that doesn't tickle the fragmentation issues on your O/S.  Here's one to start with:

    class MyQueue(Queue):
        def _init(self, maxsize=0):
            assert not maxsize
            self.start = self.end = 0
            self.q = {}
        def _qsize(self):
            return self.end - self.start
        def _put(self, x):
            self.q[self.end] = x
            self.end += 1
        def _get(self):
            x = self.q.pop(self.start)
            self.start += 1
            return x


> and hanging my instances.

I suspect that hanging is completely unrelated.  The fragments we've studied don't hang.  They just have the undesirable property that the process holds more memory blocks than expected.  Those blocks are still available to Python for reuse.  They just aren't available to other processes.

Side note:  While Python supports large queues, for most applications if the queue depth gets to 20 million, it is indicative of some other design flaw.


> Thanks for your time and involvement in this.

You're welcome.  I wish the best for you.
msg391731 - (view) Author: Raymond Hettinger (rhettinger) * (Python committer) Date: 2021-04-23 19:57
Here's a more compact version of the variant with an underlying dict:

    class MyQueue(Queue):
        def _init(self, maxsize):
            self.end = 0
            self.q = {}
        def _qsize(self):
            return len(self.q)
        def _put(self, x):
            self.q[self.end] = x
            self.end += 1
        def _get(self):
            i = self.end - len(self.q)
            return self.q.pop(i)
msg392053 - (view) Author: Jens (multiks2200) Date: 2021-04-27 10:21
Raymond, thanks for your suggestions.

My deployed applications don't hold up 20m items at a time, that was a way to show the leak. 

I was able to resolve the threading, queue-based leaks on my instances by modifying the Queue, Event and Conditions classes to use external doubly linked list library in the following manner (python3.7), not the clearest rewrite, but just to get the idea:

    class DllistCondition(threading.Condition):
        def __init__(self, lock=None):
            if lock is None:
                lock = threading.RLock()
            self._lock = lock
            self.acquire = lock.acquire
            self.release = lock.release
            try:
                self._release_save = lock._release_save
            except AttributeError:
                pass
            try:
                self._acquire_restore = lock._acquire_restore
            except AttributeError:
                pass
            try:
                self._is_owned = lock._is_owned
            except AttributeError:
                pass
            self._waiters = dllist()

        def notify(self, n=1):
            if not self._is_owned():
                raise RuntimeError("cannot notify on un-acquired lock")
            all_waiters = self._waiters
            waiters_to_notify = all_waiters
            #_islize would be empty only if there are no waiters avail, for any n
            if len(waiters_to_notify) < 1:
                return

            node = waiters_to_notify.first
            i = 1
            while True:
                #simulate _islice
                if i > n:
                    return

                waiter = node.value
                #get next node before release
                node_next = node.next
                waiter.release()
                try:
                    all_waiters.remove(node)
                except ValueError:
                    pass

                i += 1
                node = node_next
                #if it's the last node, return
                if node is None:
                    return




        def wait(self, timeout=None):
            if not self._is_owned():
                raise RuntimeError("cannot wait on un-acquired lock")
            waiter = threading._allocate_lock()
            waiter.acquire()
            node = self._waiters.append(waiter)
            saved_state = self._release_save()
            gotit = False
            try:    # restore state no matter what (e.g., KeyboardInterrupt)
                if timeout is None:
                    waiter.acquire()
                    gotit = True
                else:
                    if timeout > 0:
                        gotit = waiter.acquire(True, timeout)
                    else:
                        gotit = waiter.acquire(False)
                return gotit
            finally:
                self._acquire_restore(saved_state)
                if not gotit:
                    try:
                        self._waiters.remove(node)
                    except ValueError:
                        pass

    class DllistEvent(threading.Event):
        def __init__(self):
            self._cond = DllistCondition(threading.Lock())
            self._flag = False

    class DllistQueue(queue.Queue):

        def __init__(self, maxsize=0):
            self.maxsize = maxsize
            self._init(maxsize)

            self.mutex = threading.Lock()

            self.not_empty = DllistCondition(self.mutex)

            self.not_full = DllistCondition(self.mutex)

            self.all_tasks_done = DllistCondition(self.mutex)
            self.unfinished_tasks = 0

        def _init(self, maxsize):
            self.queue = dllist()

Now, I'm not exactly sure that the Queue itself required the `self.queue = deque()`modification, but I'm sure that conditions required getting rid of the stock `self._waiters = deque()` and the consequent use of it. 

Memory profiling constantly showed a leak at waiters_to_notify = _deque(_islice(all_waiters, n) in the threading.Condition class, which is both employed by threading.Queue and treading.Event classes. 

After the modifiction this leak is gone and I suspect it has to do something with reiniting the deque at `waiters_to_notify = _deque(_islice(all_waiters, n)`

In any cases, I'm leaving this out here in case anyone get to deal with it as well and it might be helpful. 

Thanks
History
Date User Action Args
2022-04-11 14:59:44adminsetgithub: 88077
2021-04-27 10:21:06multiks2200setmessages: + msg392053
2021-04-23 19:57:26rhettingersetmessages: + msg391731
2021-04-23 18:30:22rhettingersetmessages: + msg391724
2021-04-23 17:07:15multiks2200setmessages: + msg391710
2021-04-23 16:15:22rhettingersetassignee: rhettinger
2021-04-23 16:15:13rhettingersetstatus: open -> closed
files: + instrument_deque.diff
messages: + msg391704

keywords: + patch
resolution: not a bug
stage: resolved
2021-04-23 13:26:27multiks2200setmessages: + msg391696
2021-04-23 12:36:19multiks2200setmessages: + msg391693
2021-04-23 12:27:39multiks2200setmessages: + msg391692
2021-04-23 12:24:00multiks2200setmessages: + msg391691
2021-04-23 07:57:15rhettingersetmessages: + msg391665
2021-04-23 07:36:40rhettingersetmessages: + msg391663
2021-04-23 06:29:12multiks2200setmessages: + msg391659
2021-04-23 06:16:25multiks2200setmessages: + msg391656
2021-04-23 06:03:48multiks2200setmessages: + msg391655
2021-04-22 16:53:15rhettingersetnosy: + rhettinger
messages: + msg391614
2021-04-22 09:14:47multiks2200setmessages: + msg391587
2021-04-22 09:13:22multiks2200create