Issue34168
This issue tracker has been migrated to GitHub,
and is currently read-only.
For more information,
see the GitHub FAQs in the Python's Developer Guide.
Created on 2018-07-20 12:53 by DemGiran, last changed 2022-04-11 14:59 by admin.
Messages (5) | |||
---|---|---|---|
msg322004 - (view) | Author: Dem (DemGiran) | Date: 2018-07-20 12:53 | |
I have a list of 30 million strings, and I want to run a dns query to all of them. I do not understand how this operation can get memory intensive. I would assume that the threads would exit after the job is done, and there is also a timeout of 1 minute as well ({'dns_request_timeout': 1}). Here is a sneak peek of the machine's resources while running the script: [![enter image description here][1]][1] My code is as follows: # -*- coding: utf-8 -*- import dns.resolver import concurrent.futures from pprint import pprint from json import json bucket = json.load(open('30_million_strings.json','r')) def _dns_query(target, **kwargs): global bucket resolv = dns.resolver.Resolver() resolv.timeout = kwargs['function']['dns_request_timeout'] try: resolv.query(target + '.com', kwargs['function']['query_type']) with open('out.txt', 'a') as f: f.write(target + '\n') except Exception: pass def run(**kwargs): global bucket temp_locals = locals() pprint({k: v for k, v in temp_locals.items()}) with concurrent.futures.ThreadPoolExecutor(max_workers=kwargs['concurrency']['threads']) as executor: future_to_element = dict() for element in bucket: future = executor.submit(kwargs['function']['name'], element, **kwargs) future_to_element[future] = element for future in concurrent.futures.as_completed(future_to_element): result = future_to_element[future] run(function={'name': _dns_query, 'dns_request_timeout': 1, 'query_type': 'MX'}, concurrency={'threads': 15}) [1]: https://i.stack.imgur.com/686SW.png |
|||
msg322005 - (view) | Author: Dem (DemGiran) | Date: 2018-07-20 12:55 | |
It seems that even without the as_completed call it has the same problem. ``` # -*- coding: utf-8 -*- import dns.resolver import concurrent.futures from pprint import pprint from json import json bucket = json.load(open('30_million_strings.json','r')) def _dns_query(target, **kwargs): global bucket resolv = dns.resolver.Resolver() resolv.timeout = kwargs['function']['dns_request_timeout'] try: resolv.query(target + '.com', kwargs['function']['query_type']) with open('out.txt', 'a') as f: f.write(target + '\n') except Exception: pass def run(**kwargs): global bucket temp_locals = locals() pprint({k: v for k, v in temp_locals.items()}) with concurrent.futures.ThreadPoolExecutor(max_workers=kwargs['concurrency']['threads']) as executor: for element in bucket: executor.submit(kwargs['function']['name'], element, **kwargs) run(function={'name': _dns_query, 'dns_request_timeout': 1, 'query_type': 'MX'}, concurrency={'threads': 15}) ``` |
|||
msg322031 - (view) | Author: Tim Peters (tim.peters) * | Date: 2018-07-20 18:18 | |
If your `bucket` has 30 million items, then for element in bucket: executor.submit(kwargs['function']['name'], element, **kwargs) is going to create 30 million Future objects (and all the under-the-covers objects needed to manage their concurrency) just as fast as the main thread can create them. Nothing in your code waits for anything to finish until after they've _all_ been created and queued up under the covers. So your producer is running vastly faster than your consumers can keep up with. It's the huge backlog of pending work items that consume the RAM. To slash RAM, you need to craft a way to interleave creating new work items with giving consumers time to deal with them. |
|||
msg322075 - (view) | Author: Tim Peters (tim.peters) * | Date: 2018-07-21 05:38 | |
Note that you can consume multiple gigabytes of RAM with this simpler program too, and for the same reasons: """ import concurrent.futures as cf bucket = range(30_000_000) def _dns_query(target): from time import sleep sleep(0.1) def run(): with cf.ThreadPoolExecutor(3) as executor: future_to_element = dict() for element in bucket: future = executor.submit(_dns_query, element) future_to_element[future] = element for future in cf.as_completed(future_to_element): elt = future_to_element[future] print(elt) run() """ The usual way to mediate between producers and consumers that run at very different speeds is to use a bounded queue, so that producers block when putting new work items on the queue until consumers make progress taking work items off. That's easy enough to do with `multiprocessing` instead, but `concurrent.futures` doesn't support that directly. If you don't mind doing the work in chunks, this straightforward modification allows slashing RAM - the smaller CHUNK is, the less RAM is needed, but also the more often the code waits for the most recent chunk to finish: """ CHUNK = 10_000 ... def chunkify(iterable, chunk=CHUNK): from itertools import islice it = iter(iterable) while True: piece = list(islice(it, chunk)) if piece: yield piece else: return def run(): with cf.ThreadPoolExecutor(3) as executor: for b in chunkify(bucket): # essentially the original code just indented future_to_element = dict() for element in b: future = executor.submit(_dns_query, element) future_to_element[future] = element for future in cf.as_completed(future_to_element): elt = future_to_element[future] print(elt) """ |
|||
msg322373 - (view) | Author: Josh Rosenberg (josh.r) * | Date: 2018-07-25 17:37 | |
Note: While this particular use case wouldn't be fixed (map returns in order, not as completed), applying the fix from #29842 would make many similar use cases both simpler to implement and more efficient/possible. That said, no action has been taken on #29842 (no objections, but no action either), so I'm not sure what to do to push it to completion. |
History | |||
---|---|---|---|
Date | User | Action | Args |
2022-04-11 14:59:03 | admin | set | github: 78349 |
2018-07-25 17:37:22 | josh.r | set | nosy:
+ josh.r messages: + msg322373 |
2018-07-21 05:38:19 | tim.peters | set | messages: + msg322075 |
2018-07-20 18:18:21 | tim.peters | set | nosy:
+ tim.peters messages: + msg322031 |
2018-07-20 12:55:27 | DemGiran | set | messages: + msg322005 |
2018-07-20 12:53:50 | DemGiran | create |