#!/usr/bin/python3 import sys import time import math import itertools import multiprocessing from functools import partial from concurrent import futures MIN = 100000000 MAX = 100100000 def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def _get_chunks(*iterables, chunksize): it = zip(*iterables) while True: chunk = tuple(itertools.islice(it, chunksize)) if not chunk: return yield chunk def _process_chunk(fn, chunk): return [fn(*args) for args in chunk] class FastProcessPoolExecutor(futures.ProcessPoolExecutor): def map(self, fn, *iterables, timeout=None, chunksize=1): if chunksize < 1: chunksize = 1 def result_iterator(): for chunk in results: yield from chunk results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), timeout=timeout) return result_iterator() if __name__ == "__main__": print("Starting multiproc...", end="") start = time.time() p = multiprocessing.Pool(4) l = p.map(is_prime, range(MIN, MAX)) stop = time.time() print("done in {} s.".format(stop - start)) print("Starting futures...", end="") start = time.time() e = futures.ProcessPoolExecutor(4) f = e.map(is_prime, range(MIN, MAX)) l2 = list(f) stop = time.time() print("done in {} s.".format(stop - start)) print('Starting futures "fixed" (no chunking)...', end="") start = time.time() e = FastProcessPoolExecutor(4) f = e.map(is_prime, range(MIN, MAX)) l3 = list(f) stop = time.time() print("done in {} s.".format(stop - start)) print('Starting futures "fixed" (with chunking)...', end="") start = time.time() e = FastProcessPoolExecutor(4) chunksize, extra = divmod(MAX-MIN, 4 * 4) if extra: chunksize += 1 f = e.map(is_prime, range(MIN, MAX), chunksize=chunksize) l4 = list(f) stop = time.time() print("done in {} s.".format(stop - start)) print(len(l)) print(len(l2)) print(len(l3)) print(len(l4)) print("l == l2:", l == l2) print("l == l3:", l == l3) print("l == l4:", l == l3)