diff -r a444464a2e87 Include/opcode.h --- a/Include/opcode.h Thu May 14 18:47:17 2015 -0400 +++ b/Include/opcode.h Thu May 14 19:21:48 2015 -0700 @@ -1,86 +1,86 @@ /* Auto-generated by Tools/scripts/generate_opcode_h.py */ #ifndef Py_OPCODE_H #define Py_OPCODE_H #ifdef __cplusplus extern "C" { #endif /* Instruction opcodes for compiled code */ -#define POP_TOP 1 -#define ROT_TWO 2 -#define ROT_THREE 3 -#define DUP_TOP 4 -#define DUP_TOP_TWO 5 -#define NOP 9 -#define UNARY_POSITIVE 10 -#define UNARY_NEGATIVE 11 -#define UNARY_NOT 12 -#define UNARY_INVERT 15 -#define BINARY_MATRIX_MULTIPLY 16 -#define INPLACE_MATRIX_MULTIPLY 17 -#define BINARY_POWER 19 -#define BINARY_MULTIPLY 20 -#define BINARY_MODULO 22 -#define BINARY_ADD 23 -#define BINARY_SUBTRACT 24 -#define BINARY_SUBSCR 25 -#define BINARY_FLOOR_DIVIDE 26 -#define BINARY_TRUE_DIVIDE 27 -#define INPLACE_FLOOR_DIVIDE 28 -#define INPLACE_TRUE_DIVIDE 29 -#define GET_AITER 50 -#define GET_ANEXT 51 -#define BEFORE_ASYNC_WITH 52 -#define STORE_MAP 54 -#define INPLACE_ADD 55 -#define INPLACE_SUBTRACT 56 -#define INPLACE_MULTIPLY 57 -#define INPLACE_MODULO 59 -#define STORE_SUBSCR 60 -#define DELETE_SUBSCR 61 -#define BINARY_LSHIFT 62 -#define BINARY_RSHIFT 63 -#define BINARY_AND 64 -#define BINARY_XOR 65 -#define BINARY_OR 66 -#define INPLACE_POWER 67 -#define GET_ITER 68 -#define PRINT_EXPR 70 -#define LOAD_BUILD_CLASS 71 -#define YIELD_FROM 72 -#define GET_AWAITABLE 73 -#define INPLACE_LSHIFT 75 -#define INPLACE_RSHIFT 76 -#define INPLACE_AND 77 -#define INPLACE_XOR 78 -#define INPLACE_OR 79 -#define BREAK_LOOP 80 -#define WITH_CLEANUP_START 81 -#define WITH_CLEANUP_FINISH 82 -#define RETURN_VALUE 83 -#define IMPORT_STAR 84 -#define YIELD_VALUE 86 -#define POP_BLOCK 87 -#define END_FINALLY 88 -#define POP_EXCEPT 89 -#define HAVE_ARGUMENT 90 -#define STORE_NAME 90 -#define DELETE_NAME 91 -#define UNPACK_SEQUENCE 92 -#define FOR_ITER 93 -#define UNPACK_EX 94 -#define STORE_ATTR 95 -#define DELETE_ATTR 96 -#define STORE_GLOBAL 97 -#define DELETE_GLOBAL 98 +#define POP_TOP 1 +#define ROT_TWO 2 +#define ROT_THREE 3 +#define DUP_TOP 4 +#define DUP_TOP_TWO 5 +#define NOP 9 +#define UNARY_POSITIVE 10 +#define UNARY_NEGATIVE 11 +#define UNARY_NOT 12 +#define UNARY_INVERT 15 +#define BINARY_MATRIX_MULTIPLY 16 +#define INPLACE_MATRIX_MULTIPLY 17 +#define BINARY_POWER 19 +#define BINARY_MULTIPLY 20 +#define BINARY_MODULO 22 +#define BINARY_ADD 23 +#define BINARY_SUBTRACT 24 +#define BINARY_SUBSCR 25 +#define BINARY_FLOOR_DIVIDE 26 +#define BINARY_TRUE_DIVIDE 27 +#define INPLACE_FLOOR_DIVIDE 28 +#define INPLACE_TRUE_DIVIDE 29 +#define GET_AITER 50 +#define GET_ANEXT 51 +#define BEFORE_ASYNC_WITH 52 +#define STORE_MAP 54 +#define INPLACE_ADD 55 +#define INPLACE_SUBTRACT 56 +#define INPLACE_MULTIPLY 57 +#define INPLACE_MODULO 59 +#define STORE_SUBSCR 60 +#define DELETE_SUBSCR 61 +#define BINARY_LSHIFT 62 +#define BINARY_RSHIFT 63 +#define BINARY_AND 64 +#define BINARY_XOR 65 +#define BINARY_OR 66 +#define INPLACE_POWER 67 +#define GET_ITER 68 +#define PRINT_EXPR 70 +#define LOAD_BUILD_CLASS 71 +#define YIELD_FROM 72 +#define GET_AWAITABLE 73 +#define INPLACE_LSHIFT 75 +#define INPLACE_RSHIFT 76 +#define INPLACE_AND 77 +#define INPLACE_XOR 78 +#define INPLACE_OR 79 +#define BREAK_LOOP 80 +#define WITH_CLEANUP_START 81 +#define WITH_CLEANUP_FINISH 82 +#define RETURN_VALUE 83 +#define IMPORT_STAR 84 +#define YIELD_VALUE 86 +#define POP_BLOCK 87 +#define END_FINALLY 88 +#define POP_EXCEPT 89 +#define HAVE_ARGUMENT 90 +#define STORE_NAME 90 +#define DELETE_NAME 91 +#define UNPACK_SEQUENCE 92 +#define FOR_ITER 93 +#define UNPACK_EX 94 +#define STORE_ATTR 95 +#define DELETE_ATTR 96 +#define STORE_GLOBAL 97 +#define DELETE_GLOBAL 98 #define LOAD_CONST 100 #define LOAD_NAME 101 #define BUILD_TUPLE 102 #define BUILD_LIST 103 #define BUILD_SET 104 #define BUILD_MAP 105 #define LOAD_ATTR 106 #define COMPARE_OP 107 #define IMPORT_NAME 108 #define IMPORT_FROM 109 diff -r a444464a2e87 Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Thu May 14 18:47:17 2015 -0400 +++ b/Lib/concurrent/futures/_base.py Thu May 14 19:21:48 2015 -0700 @@ -554,20 +554,60 @@ class Executor(object): for future in fs: if timeout is None: yield future.result() else: yield future.result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator() + def filter(self, fn, iterable, timeout=None): + """Get an iterator equivalent to filter(fn, iterable). + + Args: + fn: A filter function that takes one argument + iterable: An iterable of arguments to pass through the filter + function + + Returns: + An iterator equivalent to: filter(fn, iterable) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(arg) raises for any values. + """ + if timeout is not None: + end_time = timeout + time.time() + + items_and_futures = [ + (item, self.submit(fn, item)) for item in iterable + ] + + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + for item, future in items_and_futures: + if timeout is None: + result = future.result() + else: + result = future.result(end_time - time.time()) + if result: + yield item + finally: + for _, future in items_and_futures: + future.cancel() + return result_iterator() + def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. Args: wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed. diff -r a444464a2e87 Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Thu May 14 18:47:17 2015 -0400 +++ b/Lib/test/test_concurrent_futures.py Thu May 14 19:21:48 2015 -0700 @@ -46,20 +46,30 @@ def mul(x, y): def sleep_and_raise(t): time.sleep(t) raise Exception('this is an exception') def sleep_and_print(t, msg): time.sleep(t) print(msg) sys.stdout.flush() +def mod_3(i): + return i % 3 == 0 + +def mod_to_3(i): + return 3 % i == 0 + +def sleep_and_maybe_timeout(secs): + time.sleep(secs) + return True + class MyObject(object): def my_method(self): pass class ExecutorMixin: worker_count = 5 def setUp(self): @@ -396,20 +406,49 @@ class ExecutorTest: [0, 0, 6], timeout=5): results.append(i) except futures.TimeoutError: pass else: self.fail('expected TimeoutError') self.assertEqual([None, None], results) + def test_filter(self): + self.assertEqual( + list(self.executor.filter(mod_3, range(20))), + list(filter(mod_3, range(20)))) + + def test_filter_exception(self): + i = self.executor.filter( + mod_to_3, + [5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5]) + #F F T F T e + self.assertEqual(i.__next__(), 3) + self.assertEqual(i.__next__(), 1) + self.assertRaises(ZeroDivisionError, i.__next__) + + def test_filter_timeout(self): + results = [] + try: + for i in self.executor.filter( + sleep_and_maybe_timeout, + [0, 0, 6], + timeout=5): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + + self.assertEqual([0, 0], results) + def test_shutdown_race_issue12456(self): # Issue #12456: race condition at shutdown where trying to post a # sentinel in the call queue blocks (the queue is full while processes # have exited). self.executor.map(str, [2] * (self.worker_count + 1)) self.executor.shutdown() @test.support.cpython_only def test_no_stale_references(self): # Issue #16284: check that the executors don't unnecessarily hang onto