from concurrent import futures import functools def chunks(*iterables, chunksize): """Iterator over zip()ed iterables in chunks.""" current_chunk = 0 l = [] for i in zip(*iterables): l.append(i) current_chunk += 1 if current_chunk >= chunksize: yield l l = [] current_chunk = 0 if l: yield l # if the smallest iterable length is not evenly # divisible by chunksize. def process_chunk(fn, chunk): # Use on chunks from chunks() return [fn(*args) for args in chunk] class FastProcessPoolExecutor(futures.ProcessPoolExecutor): def map(self, fn, *iterables, timeout=None, chunksize=None): if chunksize is None: try: iter_len = min(len(i) for i in iterables) except TypeError: # At least one iterable has no length, good chunksize # can't be reliably guessed. chunksize = 1 else: # From multiprocessing.Pool.map(). chunksize, extra = divmod(iter_len, self._max_workers * 4) if extra: chunksize += 1 if chunksize == 1: for i in super().map(fn, *iterables, timeout=timeout): yield i else: for chunk in super().map(functools.partial(process_chunk, fn), chunks(*iterables, chunksize=chunksize), timeout=timeout): for i in chunk: yield i