classification
Title: Thread pool return ref hold memory
Type: resource usage Stage: resolved
Components: asyncio Versions: Python 3.7, Python 3.6, Python 3.5
process
Status: closed Resolution: not a bug
Dependencies: Superseder:
Assigned To: Nosy List: Tianshu Gao, asvetlov, johndoee, pablogsal, yselivanov
Priority: normal Keywords:

Created on 2019-08-21 20:55 by Tianshu Gao, last changed 2020-01-21 19:46 by asvetlov. This issue is now closed.

Messages (4)
msg350108 - (view) Author: Tianshu Gao (Tianshu Gao) Date: 2019-08-21 20:55
This is very similar to issue35715. But this is happen for thread.

After the func in thread finished, the memory is still hold and accumulate.

import asyncio
import time
import concurrent
import threading

loop = asyncio.get_event_loop()

def prepare_a_giant_list():
    m = []
    for i in range(1000*1000):
        m.append("There's a fat fox jump over a sheep" + str(i))

    th_num = threading.active_count()
    print("Thread number is {}".format(th_num))
    return m

@asyncio.coroutine
def main():
    global loop
    global counter
    async_executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    loop.run_in_executor(async_executor, prepare_a_giant_list)
    time.sleep(15)
    loop.run_in_executor(async_executor, prepare_a_giant_list)
    time.sleep(15)
    loop.run_in_executor(async_executor, prepare_a_giant_list)
    time.sleep(15)
    loop.run_in_executor(async_executor, prepare_a_giant_list)
    time.sleep(15)

if __name__ == "__main__":
    loop.run_until_complete(main())
    loop.close()
msg350523 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2019-08-26 10:51
In asyncio code please use non-blocking code and await a future returned by run_until_complete.

The following code doesn't leak:

import asyncio
import concurrent
import threading


def prepare_a_giant_list():
    m = []
    for i in range(1000*1000):
        m.append("There's a fat fox jump over a sheep" + str(i))

    th_num = threading.active_count()
    print("Thread number is {}".format(th_num))
    return m


async def main():
    loop = asyncio.get_running_loop()
    async_executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    await loop.run_in_executor(async_executor, prepare_a_giant_list)
    await asyncio.sleep(15)
    await loop.run_in_executor(async_executor, prepare_a_giant_list)
    await asyncio.sleep(15)
    await loop.run_in_executor(async_executor, prepare_a_giant_list)
    await asyncio.sleep(15)
    await loop.run_in_executor(async_executor, prepare_a_giant_list)
    await asyncio.sleep(15)


if __name__ == "__main__":
    asyncio.run(main())
msg360406 - (view) Author: Anders (johndoee) Date: 2020-01-21 16:08
Note: due to a change in Python 3.8 this example would be a lot less noticeable if tested. The problem remains the same though.

If you run this snippet with Python 3.7, which is before the thread reuse was introduced into the ThreadPoolExecutor, each thread will keep around 600mb of memory in use.

This can be solved by shutting down the ThreadPoolExecutor which this example does.

Now, the big problem is that asyncio uses a long-running ThreadPoolExecutor, per default, for run_in_executor. Those threads will stay around forever and consume memory until the application is shut down.

If you have a job that consumes a lot of memory for a short period of time and use any long-running ThreadPoolExecutor then the memory will just keep growing as the job hits various threads that are never cleaned up.

----------
import asyncio
import concurrent
import threading


def prepare_a_giant_list():
    d = {}
    for i in range(1000):
        d[i] = {}
        for j in range(1000):
            d[i][j] = {}
            for k in range(30):
                d[i][j][k] = 'a' * 1000
    del d

    th_num = threading.active_count()
    print("Thread number is {}".format(th_num))


async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as async_executor:
        await loop.run_in_executor(async_executor, prepare_a_giant_list)
        await asyncio.sleep(5)
        await loop.run_in_executor(async_executor, prepare_a_giant_list)
        await asyncio.sleep(5)
        await loop.run_in_executor(async_executor, prepare_a_giant_list)
        await asyncio.sleep(5)
        await loop.run_in_executor(async_executor, prepare_a_giant_list)
        await asyncio.sleep(5)
    print('Done!')
    await asyncio.sleep(15)


if __name__ == "__main__":
    asyncio.run(main())
----------
msg360428 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2020-01-21 19:46
The latest example releases allocated python objects.
The memory is returned to allocator which, in turn, can hold it for a while. The allocator is not controlled by Python API.

Anyway, if an explicit executor solves your needs -- please just use it.
History
Date User Action Args
2020-01-21 19:46:51asvetlovsetmessages: + msg360428
2020-01-21 16:08:58johndoeesetnosy: + johndoee

messages: + msg360406
versions: + Python 3.6, Python 3.7
2019-09-09 10:52:43asvetlovsetstatus: open -> closed
resolution: not a bug
stage: resolved
2019-08-26 10:51:15asvetlovsetmessages: + msg350523
2019-08-21 20:58:40rhettingersetnosy: + pablogsal
2019-08-21 20:55:03Tianshu Gaocreate