This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

Author tim.peters
Recipients DemGiran, tim.peters
Date 2018-07-21.05:38:19
SpamBayes Score -1.0
Marked as misclassified Yes
Message-id <1532151499.6.0.56676864532.issue34168@psf.upfronthosting.co.za>
In-reply-to
Content
Note that you can consume multiple gigabytes of RAM with this simpler program too, and for the same reasons:

"""
import concurrent.futures as cf

bucket = range(30_000_000)

def _dns_query(target):
    from time import sleep
    sleep(0.1)

def run():
    with cf.ThreadPoolExecutor(3) as executor:
        future_to_element = dict()

        for element in bucket:
            future = executor.submit(_dns_query, element)
            future_to_element[future] = element

        for future in cf.as_completed(future_to_element):
            elt = future_to_element[future]
            print(elt)

run()
"""

The usual way to mediate between producers and consumers that run at very different speeds is to use a bounded queue, so that producers block when putting new work items on the queue until consumers make progress taking work items off.  That's easy enough to do with `multiprocessing` instead, but `concurrent.futures` doesn't support that directly.

If you don't mind doing the work in chunks, this straightforward modification allows slashing RAM - the smaller CHUNK is, the less RAM is needed, but also the more often the code waits for the most recent chunk to finish:

"""
CHUNK = 10_000

...

def chunkify(iterable, chunk=CHUNK):
    from itertools import islice
    it = iter(iterable)
    while True:
        piece = list(islice(it, chunk))
        if piece:
            yield piece
        else:
            return

def run():
    with cf.ThreadPoolExecutor(3) as executor:
        for b in chunkify(bucket):
            # essentially the original code just indented
            future_to_element = dict()
            for element in b:
                future = executor.submit(_dns_query, element)
                future_to_element[future] = element

            for future in cf.as_completed(future_to_element):
                elt = future_to_element[future]
                print(elt)
"""
History
Date User Action Args
2018-07-21 05:38:19tim.peterssetrecipients: + tim.peters, DemGiran
2018-07-21 05:38:19tim.peterssetmessageid: <1532151499.6.0.56676864532.issue34168@psf.upfronthosting.co.za>
2018-07-21 05:38:19tim.peterslinkissue34168 messages
2018-07-21 05:38:19tim.peterscreate