diff -r 53f5e16695d0 Lib/multiprocessing/pool.py --- a/Lib/multiprocessing/pool.py Thu Nov 17 01:50:16 2016 -0800 +++ b/Lib/multiprocessing/pool.py Fri Nov 18 01:11:33 2016 +0800 @@ -283,15 +283,19 @@ raise ValueError("Pool not running") if chunksize == 1: result = IMapIterator(self._cache) - self._taskqueue.put((((result._job, i, func, (x,), {}) - for i, x in enumerate(iterable)), result._set_length)) + self._taskqueue.put(( + ((result._job, i, func, (x,), {}) for i, x in enumerate(iterable)), + result._job, result._set_length + )) return result else: assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapIterator(self._cache) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) - for i, x in enumerate(task_batches)), result._set_length)) + self._taskqueue.put(( + ((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), + result._job, result._set_length + )) return (item for chunk in result for item in chunk) def imap_unordered(self, func, iterable, chunksize=1): @@ -302,15 +306,19 @@ raise ValueError("Pool not running") if chunksize == 1: result = IMapUnorderedIterator(self._cache) - self._taskqueue.put((((result._job, i, func, (x,), {}) - for i, x in enumerate(iterable)), result._set_length)) + self._taskqueue.put(( + ((result._job, i, func, (x,), {}) for i, x in enumerate(iterable)), + result._job, result._set_length + )) return result else: assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapUnorderedIterator(self._cache) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) - for i, x in enumerate(task_batches)), result._set_length)) + self._taskqueue.put(( + ((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), + result._job, result._set_length + )) return (item for chunk in result for item in chunk) def apply_async(self, func, args=(), kwds={}, callback=None, @@ -321,7 +329,7 @@ if self._state != RUN: raise ValueError("Pool not running") result = ApplyResult(self._cache, callback, error_callback) - self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) + self._taskqueue.put(([(result._job, None, func, args, kwds)], result._job, None)) return result def map_async(self, func, iterable, chunksize=None, callback=None, @@ -352,8 +360,10 @@ task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback, error_callback=error_callback) - self._taskqueue.put((((result._job, i, mapper, (x,), {}) - for i, x in enumerate(task_batches)), None)) + self._taskqueue.put(( + ((result._job, i, mapper, (x,), {}) for i, x in enumerate(task_batches)), + result._job, None + )) return result @staticmethod @@ -373,7 +383,7 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache): thread = threading.current_thread() - for taskseq, set_length in iter(taskqueue.get, None): + for taskseq, job, set_length in iter(taskqueue.get, None): task = None i = -1 try: @@ -384,7 +394,8 @@ try: put(task) except Exception as e: - job, ind = task[:2] + assert task[0] == job + ind = task[1] try: cache[job]._set(ind, (False, e)) except KeyError: @@ -396,12 +407,13 @@ continue break except Exception as ex: - job, ind = task[:2] if task else (0, 0) + assert job == task[0] if task else job + ind = task[1] if task else 0 if job in cache: - cache[job]._set(ind + 1, (False, ex)) + cache[job]._set(ind+1 if task else ind, (False, ex)) if set_length: util.debug('doing set_length()') - set_length(i+1) + set_length(i+1 if task else 1) else: util.debug('task handler got sentinel')