diff -r c2c3b79ba992 Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Thu Jun 04 11:10:36 2015 -0400 +++ b/Lib/concurrent/futures/_base.py Thu Jun 04 14:59:18 2015 -0400 @@ -506,6 +506,36 @@ self._condition.notify_all() self._invoke_callbacks() + def __await__(self): + # TODO: Resolve circular imports. + import asyncio + + loop = asyncio.get_event_loop() + afut = asyncio.Future(loop=loop) + fut = self + + def sync(): + # This function will be called from asyncio loop thread + if fut.cancelled(): + afut.cancel() + return + try: + result = fut.result() + except Exception as ex: + afut.set_exception(ex) + else: + afut.set_result(result) + + def cancel(): + if not fut.cancelled(): + fut.cancel() + asyncio.Future.cancel(afut) + + afut.cancel = cancel + self.add_done_callback(lambda fut: loop.call_soon_threadsafe(sync)) + return iter(afut) + + class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" diff -r c2c3b79ba992 Lib/test/test_asyncio/test_pep492.py --- a/Lib/test/test_asyncio/test_pep492.py Thu Jun 04 11:10:36 2015 -0400 +++ b/Lib/test/test_asyncio/test_pep492.py Thu Jun 04 14:59:18 2015 -0400 @@ -134,5 +134,97 @@ data = self.loop.run_until_complete(foo()) self.assertEqual(data, 'spam') + +class ConcurrentFuturesIntegrationTests(BaseTest): + + @support.reap_threads + def test_concurrent_futures_1(self): + import time + from concurrent import futures as cf + + executor = cf.ThreadPoolExecutor() + def waiter(): + time.sleep(0.01) + return 'spam' + + async def coro(): + return await executor.submit(waiter) + + self.loop._write_to_self = mock.Mock() + + asyncio.set_event_loop(self.loop) + try: + data = self.loop.run_until_complete(coro()) + self.assertEqual(data, 'spam') + finally: + asyncio.set_event_loop(None) + executor.shutdown() + + @support.reap_threads + def test_concurrent_futures_2(self): + import time + from concurrent import futures as cf + + executor = cf.ThreadPoolExecutor(max_workers=1) + def waiter(): + time.sleep(0.01) + return 'spam' + + async def coro(): + executor.submit(waiter) + # 'fut' won't start execution right away because + # executor will be busy with the first submitted task + fut = executor.submit(waiter) + fut.cancel() + return await fut + + self.loop._write_to_self = mock.Mock() + + asyncio.set_event_loop(self.loop) + try: + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(coro()) + finally: + asyncio.set_event_loop(None) + executor.shutdown() + + @support.reap_threads + def test_concurrent_futures_3(self): + import time + from concurrent import futures as cf + + N = 0 + + executor = cf.ThreadPoolExecutor(max_workers=1) + def waiter(): + nonlocal N + time.sleep(0.1) + N += 1 + return 'spam' + + async def coro(): + executor.submit(waiter) + await executor.submit(waiter) + + async def killer(task): + task.cancel() + + self.loop._write_to_self = mock.Mock() + + asyncio.set_event_loop(self.loop) + try: + task1 = asyncio.ensure_future(coro()) + task2 = asyncio.ensure_future(killer(task1)) + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(asyncio.gather(task1, task2)) + finally: + asyncio.set_event_loop(None) + executor.shutdown() + + # N must be 1: second 'executor.submit()' future must be cancelled + # bu the 'killer()' coroutine + self.assertEqual(N, 1) + + if __name__ == '__main__': unittest.main()