diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -275,6 +275,22 @@ return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) + def _guarded_task_generation(self, result_job, func, iterable): + '''Provides a generator of tasks for imap and imap_unordered with + appropriate handling for iterables which throw exceptions during + iteration.''' + try: + i = -1 + for i, x in enumerate(iterable): + yield (result_job, i, func, (x,), {}) + except Exception as e: + yield (result_job, i+1, self._helper_reraises_exception, (e,), {}) + + @staticmethod + def _helper_reraises_exception(ex): + 'Pickle-able helper function for use by _guarded_task_generation.' + raise ex + def imap(self, func, iterable, chunksize=1): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. @@ -283,15 +299,25 @@ raise ValueError("Pool not running") if chunksize == 1: result = IMapIterator(self._cache) - self._taskqueue.put((((result._job, i, func, (x,), {}) - for i, x in enumerate(iterable)), result._set_length)) + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, + func, + iterable), + result._set_length + )) return result else: assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapIterator(self._cache) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) - for i, x in enumerate(task_batches)), result._set_length)) + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, + mapstar, + task_batches), + result._set_length + )) return (item for chunk in result for item in chunk) def imap_unordered(self, func, iterable, chunksize=1): @@ -302,15 +328,25 @@ raise ValueError("Pool not running") if chunksize == 1: result = IMapUnorderedIterator(self._cache) - self._taskqueue.put((((result._job, i, func, (x,), {}) - for i, x in enumerate(iterable)), result._set_length)) + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, + func, + iterable), + result._set_length + )) return result else: assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapUnorderedIterator(self._cache) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) - for i, x in enumerate(task_batches)), result._set_length)) + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, + mapstar, + task_batches), + result._set_length + )) return (item for chunk in result for item in chunk) def apply_async(self, func, args=(), kwds={}, callback=None,