diff -r f83adc06f486 Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py Fri Jul 18 09:14:55 2014 -0500 +++ b/Lib/concurrent/futures/process.py Wed Jul 23 21:56:40 2014 -0400 @@ -55,6 +55,8 @@ from multiprocessing.connection import wait import threading import weakref +from functools import partial +import itertools # Workers are created as daemon threads and processes. This is done to allow the # interpreter to exit when there are still idle processes in a @@ -108,6 +110,23 @@ self.args = args self.kwargs = kwargs +def _get_chunks(*iterables, chunksize): + """ Iterator over zip()ed iterables in chunks. """ + it = [zip(*iterables)] * chunksize + for item in itertools.zip_longest(*it): + yield tuple(filter(None, item)) + +def _process_chunk(fn, chunk): + """ Processes a chunk of an iterable passed to map. + + Runs the function passed to map() on a chunk of the + iterable passed to map. + + This function is run in a separate process. + + """ + return [fn(*args) for args in chunk] + def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. @@ -411,6 +430,36 @@ return f submit.__doc__ = _base.Executor.submit.__doc__ + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns a iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: If greater than one, the iterables will be chopped into + chunks of size chunksize and submitted to the process pool. + If set to one, the items in the list will be sent one at a time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if chunksize > 1: + for chunk in super().map(partial(_process_chunk, fn), + _get_chunks(*iterables, chunksize=chunksize), + timeout=timeout): + yield from chunk + else: + chunksize = 1 + yield from super().map(fn, *iterables, timeout=timeout) + def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True