# HG changeset patch # User Ram Rachum # Date 1431638460 -10800 # Branch executor_filter # Node ID 124e2d1fc787a6f477d642fa4d2a6043950911a6 # Parent 1012a8138fcb592a11d7905fea755645d49b4dd6 Add `Executor.filter` diff -r 1012a8138fcb -r 124e2d1fc787 Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Thu May 14 14:14:02 2015 -0400 +++ b/Lib/concurrent/futures/_base.py Fri May 15 00:21:00 2015 +0300 @@ -554,6 +554,46 @@ future.cancel() return result_iterator() + def filter(self, fn, iterable, timeout=None): + """Get an iterator equivalent to filter(fn, iterable). + + Args: + fn: A filter function that takes one argument + iterable: An iterable of arguments to pass through the filter + function + + Returns: + An iterator equivalent to: filter(fn, iterable) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(arg) raises for any values. + """ + if timeout is not None: + end_time = timeout + time.time() + + items_and_futures = [ + (item, self.submit(fn, item)) for item in iterable + ] + + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + for item, future in items_and_futures: + if timeout is None: + result = future.result() + else: + result = future.result(end_time - time.time()) + if result: + yield item + finally: + for _, future in items_and_futures: + future.cancel() + return result_iterator() + def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. diff -r 1012a8138fcb -r 124e2d1fc787 Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Thu May 14 14:14:02 2015 -0400 +++ b/Lib/test/test_concurrent_futures.py Fri May 15 00:21:00 2015 +0300 @@ -402,6 +402,35 @@ self.assertEqual([None, None], results) + def test_filter(self): + filter_function = (lambda x: x % 3 == 0) + self.assertEqual( + list(self.executor.filter(filter_function, range(20))), + list(filter(filter_function, range(20)))) + + def test_filter_exception(self): + i = self.executor.filter(lambda x: 3 % x == 0, + [5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5]) + self.assertEqual(i.__next__(), 5) + self.assertEqual(i.__next__(), 4) + self.assertEqual(i.__next__(), 2) + self.assertEqual(i.__next__(), 1) + self.assertRaises(ZeroDivisionError, i.__next__) + + def test_filter_timeout(self): + results = [] + try: + for i in self.executor.filter(lambda x: [time.sleep(x)], + [0, 0, 6], + timeout=5): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + + self.assertEqual([0, 0], results) + def test_shutdown_race_issue12456(self): # Issue #12456: race condition at shutdown where trying to post a # sentinel in the call queue blocks (the queue is full while processes