This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: RAM consumption too high using concurrent.futures (Python 3.7 / 3.6 )
Type: Stage:
Components: Library (Lib) Versions: Python 3.7, Python 3.6
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: DemGiran, josh.r, tim.peters
Priority: normal Keywords:

Created on 2018-07-20 12:53 by DemGiran, last changed 2022-04-11 14:59 by admin.

Messages (5)
msg322004 - (view) Author: Dem (DemGiran) Date: 2018-07-20 12:53
I have a list of 30 million strings, and I want to run a dns query to all of them. I do not understand how this operation can get memory intensive. I would assume that the threads would exit after the job is done, and there is also a timeout of 1 minute as well ({'dns_request_timeout': 1}).

Here is a sneak peek of the machine's resources while running the script:

[![enter image description here][1]][1]

My code is as follows:

    # -*- coding: utf-8 -*-
    import dns.resolver
    import concurrent.futures
    from pprint import pprint
    from json import json

    
    bucket = json.load(open('30_million_strings.json','r'))
    
    
    def _dns_query(target, **kwargs):
        global bucket
        resolv = dns.resolver.Resolver()
        resolv.timeout = kwargs['function']['dns_request_timeout']
        try:
            resolv.query(target + '.com', kwargs['function']['query_type'])
            with open('out.txt', 'a') as f:
                f.write(target + '\n')
        except Exception:
            pass
    
    
    def run(**kwargs):
        global bucket
        temp_locals = locals()
        pprint({k: v for k, v in temp_locals.items()})
    
        with concurrent.futures.ThreadPoolExecutor(max_workers=kwargs['concurrency']['threads']) as executor:
            future_to_element = dict()
    
            for element in bucket:
                future = executor.submit(kwargs['function']['name'], element, **kwargs)
                future_to_element[future] = element
    
            for future in concurrent.futures.as_completed(future_to_element):
                result = future_to_element[future]
    
    
    run(function={'name': _dns_query, 'dns_request_timeout': 1, 'query_type': 'MX'},
        concurrency={'threads': 15})


  [1]: https://i.stack.imgur.com/686SW.png
msg322005 - (view) Author: Dem (DemGiran) Date: 2018-07-20 12:55
It seems that even without the as_completed call it has the same problem. 


```
# -*- coding: utf-8 -*-
import dns.resolver
import concurrent.futures
from pprint import pprint
from json import json


bucket = json.load(open('30_million_strings.json','r'))


def _dns_query(target, **kwargs):
    global bucket
    resolv = dns.resolver.Resolver()
    resolv.timeout = kwargs['function']['dns_request_timeout']
    try:
        resolv.query(target + '.com', kwargs['function']['query_type'])
        with open('out.txt', 'a') as f:
            f.write(target + '\n')
    except Exception:
        pass


def run(**kwargs):
    global bucket
    temp_locals = locals()
    pprint({k: v for k, v in temp_locals.items()})

    with concurrent.futures.ThreadPoolExecutor(max_workers=kwargs['concurrency']['threads']) as executor:
        for element in bucket:
            executor.submit(kwargs['function']['name'], element, **kwargs)


run(function={'name': _dns_query, 'dns_request_timeout': 1, 'query_type': 'MX'},
    concurrency={'threads': 15})
```
msg322031 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2018-07-20 18:18
If your `bucket` has 30 million items, then

    for element in bucket:
        executor.submit(kwargs['function']['name'], element, **kwargs)

is going to create 30 million Future objects (and all the under-the-covers objects needed to manage their concurrency) just as fast as the main thread can create them.  Nothing in your code waits for anything to finish until after they've _all_ been created and queued up under the covers.

So your producer is running vastly faster than your consumers can keep up with.  It's the huge backlog of pending work items that consume the RAM.  To slash RAM, you need to craft a way to interleave creating new work items with giving consumers time to deal with them.
msg322075 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2018-07-21 05:38
Note that you can consume multiple gigabytes of RAM with this simpler program too, and for the same reasons:

"""
import concurrent.futures as cf

bucket = range(30_000_000)

def _dns_query(target):
    from time import sleep
    sleep(0.1)

def run():
    with cf.ThreadPoolExecutor(3) as executor:
        future_to_element = dict()

        for element in bucket:
            future = executor.submit(_dns_query, element)
            future_to_element[future] = element

        for future in cf.as_completed(future_to_element):
            elt = future_to_element[future]
            print(elt)

run()
"""

The usual way to mediate between producers and consumers that run at very different speeds is to use a bounded queue, so that producers block when putting new work items on the queue until consumers make progress taking work items off.  That's easy enough to do with `multiprocessing` instead, but `concurrent.futures` doesn't support that directly.

If you don't mind doing the work in chunks, this straightforward modification allows slashing RAM - the smaller CHUNK is, the less RAM is needed, but also the more often the code waits for the most recent chunk to finish:

"""
CHUNK = 10_000

...

def chunkify(iterable, chunk=CHUNK):
    from itertools import islice
    it = iter(iterable)
    while True:
        piece = list(islice(it, chunk))
        if piece:
            yield piece
        else:
            return

def run():
    with cf.ThreadPoolExecutor(3) as executor:
        for b in chunkify(bucket):
            # essentially the original code just indented
            future_to_element = dict()
            for element in b:
                future = executor.submit(_dns_query, element)
                future_to_element[future] = element

            for future in cf.as_completed(future_to_element):
                elt = future_to_element[future]
                print(elt)
"""
msg322373 - (view) Author: Josh Rosenberg (josh.r) * (Python triager) Date: 2018-07-25 17:37
Note: While this particular use case wouldn't be fixed (map returns in order, not as completed), applying the fix from #29842 would make many similar use cases both simpler to implement and more efficient/possible.

That said, no action has been taken on #29842 (no objections, but no action either), so I'm not sure what to do to push it to completion.
History
Date User Action Args
2022-04-11 14:59:03adminsetgithub: 78349
2018-07-25 17:37:22josh.rsetnosy: + josh.r
messages: + msg322373
2018-07-21 05:38:19tim.peterssetmessages: + msg322075
2018-07-20 18:18:21tim.peterssetnosy: + tim.peters
messages: + msg322031
2018-07-20 12:55:27DemGiransetmessages: + msg322005
2018-07-20 12:53:50DemGirancreate