classification
Title: Potential memory leak with asyncio and run_in_executor
Type: Stage:
Components: asyncio Versions: Python 3.10, Python 3.8
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: aeros, asvetlov, dralley, sophia2, yselivanov
Priority: normal Keywords:

Created on 2020-09-03 03:24 by sophia2, last changed 2020-10-30 03:11 by aeros.

Messages (7)
msg376275 - (view) Author: Sophia Wisdom (sophia2) Date: 2020-09-03 03:24
The below example leaks ~20 megabytes of memory. The amount leaked is related to both the number of items in the list and the number of times `run_in_executor` is called.

```
import asyncio

def leaker():
    x = list(range(int(1000)))
    1/0

async def function():
    loop = asyncio.get_running_loop()
    for i in range(10000):
        loop.run_in_executor(None, leaker)
```
at this point, `ps -o rss -p {pid}` outputs about 10MB

after invoking this:
```
asyncio.run(function())
```
Memory jumps to about 94MB, and doesn't return.

The lists don't show up in `gc.get_objects()`, but the amount of memory leaked does increase (though not proportionately) when the lists increase in size.

Note - this doesn't happen if `run_in_executor` is `await`ed immediately, but it does still occur if we e.g. put the future in a dictionary and then `await` the results later.
The leak still occurs on my machine if the `1/0` is omitted, but not on a colleague's.

We're pretty confused as to why this happens, and would appreciate any help.
msg379877 - (view) Author: Daniel Alley (dralley) Date: 2020-10-29 16:00
This seems likely to be a duplicate of https://bugs.python.org/issue41588, as run_in_executor(None, ...) submits tasks to a ThreadPoolExecutor underneath the hood.
msg379884 - (view) Author: Sophia Wisdom (sophia2) Date: 2020-10-29 20:53
It looks like it's not specific to the ThreadPoolExecutor.

```
import asyncio
import concurrent

def leaker_func():
    list(range(int(1000)))
    # removed 1/0 because this causes issues with the ProcessPoolExecutor

async def function():
    loop = asyncio.get_running_loop()
    for i in range(10000):
        loop.run_in_executor(concurrent.futures.ProcessPoolExecutor(), leaker_func)
```
10MB at this point

then after executing this:
```
asyncio.run(function())
```
40MB. (~same as ThreadPoolExecutor in python3.10)
msg379903 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-10-30 01:36
In the snippet provided, at least part of the resources are not finalized because executor.shutdown() was not called in the program (which should be done when creating a local instance of the executors, either explicitly or using the context manager). For the event loop's default threadpool (used w/ loop.run_in_executor(None, ...), I added a coroutine function loop.shutdown_default_executor() in 3.9+ handles this (called in asyncio.run()).

Without ever calling executor.shutdown(), the worker threads/processes and their associated resources are not finalized until interpreter shutdown. There's also some additional finalization that occurs in `_python_exit()` for both TPE and PPE (see https://github.com/python/cpython/blob/3317466061509c83dce257caab3661d52571cab1/Lib/concurrent/futures/thread.py#L23 or https://github.com/python/cpython/blob/3317466061509c83dce257caab3661d52571cab1/Lib/concurrent/futures/process.py#L87), which is called just before all non-daemon threads are joined just before interpreter shutdown occurs.

However, even considering the above, there still seems to be a significant additional difference in RSS compared to using ThreadPoolExecutor vs loop.run_in_executor() that I can't seem to account for (before and after asyncio.run()):

```
import asyncio
import concurrent.futures as cf
import os
import gc
import argparse

from concurrent.futures.thread import _python_exit

def leaker(n):
    list(range(n))

def func_TPE(n):
    with cf.ThreadPoolExecutor() as executor:
        for i in range(10_000):
            executor.submit(leaker, n)

async def func_run_in_executor(n):
    loop = asyncio.get_running_loop()
    for i in range(10_000):
        loop.run_in_executor(None, leaker, n)

def display_rss():
    os.system(f"grep ^VmRSS /proc/{os.getpid()}/status")

def main(n=100, asyncio_=False):
    try:
        if asyncio_:
            asyncio.run(func_run_in_executor(n))
        else:
            func_TPE(n)
    finally:
        _python_exit()
        gc.collect()
        print(f"after 10_000 iterations of {n} element lists:")
        display_rss()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-n", type=int, default=100)
    parser.add_argument("--asyncio", action=argparse.BooleanOptionalAction)

    print("start RSS memory:")
    display_rss()

    args = parser.parse_args()
    main(args.n, args.asyncio)
```
Results (on latest commit to master, 3.10):
asyncio -
```
[aeros:~/repos/cpython]$ ./python ~/programming/python/asyncio_run_in_exec_leak.py -n=10000 --asyncio
start RSS memory:
VmRSS:     16948 kB
after 10_000 iterations of 10000 element lists:
VmRSS:     27080 kB
```
concurrent.futures -
```
[aeros:~/repos/cpython]$ ./python ~/programming/python/asyncio_run_in_exec_leak.py -n=10000 --no-asyncio
start RSS memory:
VmRSS:     17024 kB
after 10_000 iterations of 10000 element lists:
VmRSS:     19572 kB
```
When using await before loop.run_in_executor(), the results are more similar to using ThreadPoolExecutor directly:
```
[aeros:~/repos/cpython]$ ./python ~/programming/python/asyncio_run_in_exec_leak.py -n=10000 --asyncio  
start RSS memory:
VmRSS:     16940 kB
after 10_000 iterations of 10000 element lists:
VmRSS:     17756 kB
```
However, as mentioned by the OP, if stored in a container and awaited later (such as w/ asyncio.gather()), a substantial memory difference is present (increases with list size):
```
[aeros:~/repos/cpython]$ ./python ~/programming/python/asyncio_run_in_exec_leak.py -n=10000 --asyncio
start RSS memory:
VmRSS:     16980 kB
after 10_000 iterations of 10000 element lists:
VmRSS:     29744 kB
```

Based on the above results, I think there may be a smaller leak occurring in concurrent.futures (potentially related to the linked bpo-41588) and a bit of a larger leak occurring in loop.run_in_executor(). So they can remain as separate issues IMO.

At the moment, my best guess is that there's some memory leak that occurs from the future not being fully cleaned up, but I'm not certain about that. This will likely require some further investigation.

Input from Yury and/or Andrew would definitely be appreciated. Is there something that I'm potentially missing here?
msg379905 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-10-30 01:41
Also note that the difference in memory is much higher when an exception occurs (presumably because the exception is stored on future._exception and not cleaned up?):
```
[aeros:~/repos/cpython]$ ./python ~/programming/python/asyncio_run_in_exec_leak.py -n=10000 --asyncio
start RSS memory:
VmRSS:     16976 kB
after 10_000 iterations of 10000 element lists:
VmRSS:     64132 kB
```
msg379907 - (view) Author: Sophia Wisdom (sophia2) Date: 2020-10-30 02:22
While not calling executor.shutdown() may leave some resources still used, it should be small and fixed. Regularly calling executor.shutdown() and then instantiating a new ThreadPoolExecutor in order to run an asyncio program does not seem like a good API to me.

You mention there appear to be both an event loop and a futures leak -- I think I have a good test case for the futures, without using threads at all. This seems to be leaking `future._result`s somehow even though their __del__ is called.

```
import asyncio
from concurrent.futures import Executor, Future
import gc

result_gcs = 0
suture_gcs = 0

class ResultHolder:
    def __init__(self, mem_size):
        self.mem = list(range(mem_size)) # so we can see the leak
    
    def __del__(self):
        global result_gcs
        result_gc += 1

class Suture(Future):
   def __del__(self):
       global suture_gcs
       suture_gcs += 1

class SimpleExecutor(Executor):
    def submit(self, fn):
        future = Suture()
        future.set_result(ResultHolder(1000))
        return future

async def function():
    loop = asyncio.get_running_loop()
    for i in range(10000):
        loop.run_in_executor(SimpleExecutor(), lambda x:x)

def run():
    asyncio.run(function())
    print(suture_gcs, result_gcs)
```
10MB

```
> run()
10000 10000
```
100MB

Both result_gcs and suture_gcs are 10000 every time. My best guess for why this would happen (for me it doesn't seem to happen without the loop.run_in_executor) is the conversion from a concurrent.Future to an asyncio.Future, which involves callbacks to check on the result, but that doesn't make sense, because the result itself has __del__ called on it but somehow it doesn't free the memory!
msg379909 - (view) Author: Kyle Stanley (aeros) * (Python committer) Date: 2020-10-30 03:11
> Regularly calling executor.shutdown() and then instantiating a new ThreadPoolExecutor in order to run an asyncio program does not seem like a good API to me.

Clarification: you're typically only supposed to instantiate a single ThreadPoolExecutor or ProcessPoolExecutor per program (sometimes one of each depending on use case), and continuously submit jobs to it rather than creating multiple executor instances. Otherwise, it's generally unneeded overhead.

(I'll take a look at the other parts once I find the time, just wanted to briefly mention the above in the meantime.)
History
Date User Action Args
2020-10-30 03:11:23aerossetmessages: + msg379909
2020-10-30 02:22:37sophia2setmessages: + msg379907
2020-10-30 01:41:37aerossetmessages: + msg379905
2020-10-30 01:36:50aerossetmessages: + msg379903
2020-10-29 20:53:21sophia2setmessages: + msg379884
2020-10-29 16:00:15dralleysetnosy: + dralley
messages: + msg379877
2020-09-03 04:10:12xtreaksetnosy: + aeros
2020-09-03 03:24:15sophia2create