diff -r e340fe83c510 Lib/asyncio/tasks.py --- a/Lib/asyncio/tasks.py Sun Aug 02 15:51:46 2015 +0200 +++ b/Lib/asyncio/tasks.py Sun Aug 02 10:08:51 2015 -0400 @@ -269,6 +269,13 @@ 'yield was used instead of yield from for ' 'generator in task {!r} with {}'.format( self, result))) + elif isinstance(result, concurrent.futures.Future): + if self._must_cancel and self._fut_waiter.cancel(): + self._must_cancel = False + result.add_done_callback( + functools.partial( + self._loop.call_soon_threadsafe, self._wakeup)) + self._fut_waiter = result else: # Yielding something else is an error. self._loop.call_soon( diff -r e340fe83c510 Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Sun Aug 02 15:51:46 2015 +0200 +++ b/Lib/concurrent/futures/_base.py Sun Aug 02 10:08:51 2015 -0400 @@ -506,6 +506,12 @@ self._condition.notify_all() self._invoke_callbacks() + def __await__(self): + if not self.done(): + yield self + assert self.done(), "await wasn't used with future" + return self.result() # May raise too. + class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" diff -r e340fe83c510 Lib/test/test_asyncio/test_pep492.py --- a/Lib/test/test_asyncio/test_pep492.py Sun Aug 02 15:51:46 2015 +0200 +++ b/Lib/test/test_asyncio/test_pep492.py Sun Aug 02 10:08:51 2015 -0400 @@ -183,5 +183,96 @@ self.assertEqual(data, 'spam') +class ConcurrentFuturesIntegrationTests(BaseTest): + + @support.reap_threads + def test_concurrent_futures_1(self): + import time + from concurrent import futures as cf + + executor = cf.ThreadPoolExecutor() + def waiter(): + time.sleep(0.01) + return 'spam' + + async def coro(): + return await executor.submit(waiter) + + self.loop._write_to_self = mock.Mock() + + asyncio.set_event_loop(self.loop) + try: + data = self.loop.run_until_complete(coro()) + self.assertEqual(data, 'spam') + finally: + asyncio.set_event_loop(None) + executor.shutdown() + + @support.reap_threads + def test_concurrent_futures_2(self): + import time + from concurrent import futures as cf + + executor = cf.ThreadPoolExecutor(max_workers=1) + def waiter(): + time.sleep(0.01) + return 'spam' + + async def coro(): + executor.submit(waiter) + # 'fut' won't start execution right away because + # executor will be busy with the first submitted task + fut = executor.submit(waiter) + fut.cancel() + return await fut + + self.loop._write_to_self = mock.Mock() + + asyncio.set_event_loop(self.loop) + try: + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(coro()) + finally: + asyncio.set_event_loop(None) + executor.shutdown() + + @support.reap_threads + def test_concurrent_futures_3(self): + import time + from concurrent import futures as cf + + N = 0 + + executor = cf.ThreadPoolExecutor(max_workers=1) + def waiter(): + nonlocal N + time.sleep(0.1) + N += 1 + return 'spam' + + async def coro(): + executor.submit(waiter) + await executor.submit(waiter) + + async def killer(task): + task.cancel() + + self.loop._write_to_self = mock.Mock() + + asyncio.set_event_loop(self.loop) + try: + task1 = asyncio.ensure_future(coro()) + task2 = asyncio.ensure_future(killer(task1)) + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(asyncio.gather(task1, task2)) + finally: + asyncio.set_event_loop(None) + executor.shutdown() + + # N must be 1: second 'executor.submit()' future must be cancelled + # bu the 'killer()' coroutine + self.assertEqual(N, 1) + + if __name__ == '__main__': unittest.main()