diff -r c26e6beb2e35 Doc/library/concurrent.futures.rst --- a/Doc/library/concurrent.futures.rst Sat Jul 26 19:53:38 2014 +0300 +++ b/Doc/library/concurrent.futures.rst Thu Aug 07 19:24:07 2014 -0400 @@ -38,7 +38,7 @@ future = executor.submit(pow, 323, 1235) print(future.result()) - .. method:: map(func, *iterables, timeout=None) + .. method:: map(func, *iterables, timeout=None, chunksize=1) Equivalent to :func:`map(func, *iterables) ` except *func* is executed asynchronously and several calls to *func* may be made concurrently. The @@ -48,7 +48,13 @@ *timeout* can be an int or a float. If *timeout* is not specified or ``None``, there is no limit to the wait time. If a call raises an exception, then that exception will be raised when its value is - retrieved from the iterator. + retrieved from the iterator. When using :class:`ProcessPoolExecutor`, this + method chops *iterables* into a number of chunks which it submits to the + pool as separate tasks. The (approximate) size of these chunks can be + specified by setting *chunksize* to a positive integer. For very long + iterables, using a large value for *chunksize* can significantly improve + performance compared to the default size of 1. With :class:`ThreadPoolExecutor`, + *chunksize* has no effect. .. method:: shutdown(wait=True) diff -r c26e6beb2e35 Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Sat Jul 26 19:53:38 2014 +0300 +++ b/Lib/concurrent/futures/_base.py Thu Aug 07 19:24:07 2014 -0400 @@ -520,7 +520,7 @@ """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None): + def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns a iterator equivalent to map(fn, iter). Args: @@ -528,6 +528,10 @@ passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. + chunksize: The size of the chunks the iterable will be broken into + before being passed to a child process. This argument is only + used by ProcessPoolExecutor; it is ignored by + ThreadPoolExecutor. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff -r c26e6beb2e35 Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py Sat Jul 26 19:53:38 2014 +0300 +++ b/Lib/concurrent/futures/process.py Thu Aug 07 19:24:07 2014 -0400 @@ -55,6 +55,8 @@ from multiprocessing.connection import wait import threading import weakref +from functools import partial +import itertools # Workers are created as daemon threads and processes. This is done to allow the # interpreter to exit when there are still idle processes in a @@ -108,6 +110,26 @@ self.args = args self.kwargs = kwargs +def _get_chunks(*iterables, chunksize): + """ Iterates over zip()ed iterables in chunks. """ + it = zip(*iterables) + while True: + chunk = tuple(itertools.islice(it, chunksize)) + if not chunk: + return + yield chunk + +def _process_chunk(fn, chunk): + """ Processes a chunk of an iterable passed to map. + + Runs the function passed to map() on a chunk of the + iterable passed to map. + + This function is run in a separate process. + + """ + return [fn(*args) for args in chunk] + def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. @@ -411,6 +433,39 @@ return f submit.__doc__ = _base.Executor.submit.__doc__ + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns a iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: If greater than one, the iterables will be chopped into + chunks of size chunksize and submitted to the process pool. + If set to one, the items in the list will be sent one at a time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if chunksize < 1: + raise ValueError("chunksize must be >= 1.") + + def result_iterator(): + for chunk in results: + yield from chunk + + results = super().map(partial(_process_chunk, fn), + _get_chunks(*iterables, chunksize=chunksize), + timeout=timeout) + return result_iterator() + def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True diff -r c26e6beb2e35 Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Sat Jul 26 19:53:38 2014 +0300 +++ b/Lib/test/test_concurrent_futures.py Thu Aug 07 19:24:07 2014 -0400 @@ -458,6 +458,11 @@ # Submitting other jobs fails as well. self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) + def test_map_chunksize(self): + self.assertEqual( + list(self.executor.map(pow, range(40), range(40), chunksize=6)), + list(map(pow, range(40), range(40)))) + class FutureTests(unittest.TestCase): def test_done_callback_with_result(self):