diff -r a824c40e8fc0 Lib/multiprocessing/pool.py --- a/Lib/multiprocessing/pool.py Sat Feb 21 15:26:02 2015 -0800 +++ b/Lib/multiprocessing/pool.py Mon Feb 23 00:02:03 2015 -0600 @@ -374,25 +374,34 @@ thread = threading.current_thread() for taskseq, set_length in iter(taskqueue.get, None): + task = None i = -1 - for i, task in enumerate(taskseq): - if thread._state: - util.debug('task handler found thread._state != RUN') - break - try: - put(task) - except Exception as e: - job, ind = task[:2] + try: + for i, task in enumerate(taskseq): + if thread._state: + util.debug('task handler found thread._state != RUN') + break try: - cache[job]._set(ind, (False, e)) - except KeyError: - pass - else: + put(task) + except Exception as e: + job, ind = task[:2] + try: + cache[job]._set(ind, (False, e)) + except KeyError: + pass + else: + if set_length: + util.debug('doing set_length()') + set_length(i+1) + continue + break + except Exception as ex: + job, ind = task[:2] if task else (0, 0) + if job in cache: + cache[job]._set(ind + 1, (False, ex)) if set_length: util.debug('doing set_length()') set_length(i+1) - continue - break else: util.debug('task handler got sentinel') diff -r a824c40e8fc0 Lib/test/_test_multiprocessing.py --- a/Lib/test/_test_multiprocessing.py Sat Feb 21 15:26:02 2015 -0800 +++ b/Lib/test/_test_multiprocessing.py Mon Feb 23 00:02:03 2015 -0600 @@ -1639,6 +1639,12 @@ def mul(x, y): return x*y +def exception_throwing_generator(total, when): + for i in range(total): + if i == when: + raise ValueError("Somebody said when") + yield i + class _TestPool(BaseTestCase): @classmethod @@ -1737,6 +1743,25 @@ self.assertEqual(next(it), i*i) self.assertRaises(StopIteration, it.__next__) + def test_imap_handle_iterable_exception(self): + if self.TYPE == 'manager': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) + for i in range(3): + self.assertEqual(next(it), i*i) + self.assertRaises(ValueError, it.__next__) + + # ValueError seen at start of problematic chunk's results + it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) + for i in range(6): + self.assertEqual(next(it), i*i) + self.assertRaises(ValueError, it.__next__) + it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) + for i in range(4): + self.assertEqual(next(it), i*i) + self.assertRaises(ValueError, it.__next__) + def test_imap_unordered(self): it = self.pool.imap_unordered(sqr, list(range(1000))) self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) @@ -1744,6 +1769,25 @@ it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53) self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) + def test_imap_unordered_handle_iterable_exception(self): + if self.TYPE == 'manager': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + it = self.pool.imap_unordered(sqr, + exception_throwing_generator(10, 3), + 1) + with self.assertRaises(ValueError): + # imap_unordered makes it difficult to anticipate the ValueError + for i in range(10): + self.assertEqual(next(it), i*i) + + it = self.pool.imap_unordered(sqr, + exception_throwing_generator(20, 7), + 2) + with self.assertRaises(ValueError): + for i in range(20): + self.assertEqual(next(it), i*i) + def test_make_pool(self): self.assertRaises(ValueError, multiprocessing.Pool, -1) self.assertRaises(ValueError, multiprocessing.Pool, 0)