diff -r 07161dd8a078 Doc/library/asyncio-dev.rst --- a/Doc/library/asyncio-dev.rst Mon Oct 05 10:35:19 2015 -0700 +++ b/Doc/library/asyncio-dev.rst Tue Oct 06 00:01:28 2015 +0200 @@ -96,10 +96,9 @@ and the event loop executes the next task. To schedule a callback from a different thread, the -:meth:`BaseEventLoop.call_soon_threadsafe` method should be used. Example to -schedule a coroutine from a different thread:: +:meth:`BaseEventLoop.call_soon_threadsafe` method should be used. Example:: - loop.call_soon_threadsafe(asyncio.ensure_future, coro_func()) + loop.call_soon_threadsafe(callback, *args) Most asyncio objects are not thread safe. You should only worry if you access objects outside the event loop. For example, to cancel a future, don't call @@ -107,6 +106,13 @@ loop.call_soon_threadsafe(fut.cancel) +To schedule a coroutine object from a different thread, the +:func:`run_coroutine_threadsafe` should be used. It returns a +:class:`concurrent.futures.Future` to access the result:: + + future = asyncio.run_coroutine_threadsafe(coro_func(), loop) + result = future.result(timeout) # Wait for the result with a timeout + To handle signals and to execute subprocesses, the event loop must be run in the main thread. diff -r 07161dd8a078 Doc/library/asyncio-task.rst --- a/Doc/library/asyncio-task.rst Mon Oct 05 10:35:19 2015 -0700 +++ b/Doc/library/asyncio-task.rst Tue Oct 06 00:01:28 2015 +0200 @@ -683,3 +683,42 @@ .. versionchanged:: 3.4.3 If the wait is cancelled, the future *fut* is now also cancelled. + +.. function:: run_coroutine_threadsafe(coro, loop) + + Submit a :ref:`coroutine object ` to a given event loop. + + Return a :class:`concurrent.futures.Future` to access the result. + + This function is meant to be called from a different thread than the one + where the event loop is running. Usage:: + + # Create a coroutine + coro = asyncio.sleep(1, result=3) + # Submit the coroutine to a given loop + future = asyncio.run_coroutine_threadsafe(coro, loop) + # Wait for the result with an optional timeout argument + assert future.result(timeout) == 3 + + If an exception is raised in the coroutine, the returned future will be + notified. It can also be used to cancel the task in the event loop:: + + try: + result = future.result(timeout) + except asyncio.TimeoutError: + print('The coroutine took too long, cancelling the task...') + future.cancel() + except Exception as exc: + print('The coroutine raised an exception: {!r}'.format(exc)) + else: + print('The coroutine returned: {!r}'.format(result)) + + See the :ref:`concurrency and multithreading ` + section of the documentation. + + .. note:: + + Unlike the functions above, :func:`run_coroutine_threadsafe` requires the + *loop* argument to be passed explicitely. + + .. versionadded:: 3.4.4 diff -r 07161dd8a078 Lib/asyncio/tasks.py --- a/Lib/asyncio/tasks.py Mon Oct 05 10:35:19 2015 -0700 +++ b/Lib/asyncio/tasks.py Tue Oct 06 00:01:28 2015 +0200 @@ -704,7 +704,12 @@ future = concurrent.futures.Future() def callback(): - futures._chain_future(ensure_future(coro, loop=loop), future) + try: + futures._chain_future(ensure_future(coro, loop=loop), future) + except Exception as exc: + if future.set_running_or_notify_cancel(): + future.set_exception(exc) + raise loop.call_soon_threadsafe(callback) return future diff -r 07161dd8a078 Lib/test/test_asyncio/test_tasks.py --- a/Lib/test/test_asyncio/test_tasks.py Mon Oct 05 10:35:19 2015 -0700 +++ b/Lib/test/test_asyncio/test_tasks.py Tue Oct 06 00:01:28 2015 +0200 @@ -2166,6 +2166,27 @@ with self.assertRaises(asyncio.CancelledError): self.loop.run_until_complete(future) + def test_run_coroutine_threadsafe_task_factory_exception(self): + """Test coroutine submission from a tread to an event loop + when the task factory raise an exception.""" + # Clear the time generator + asyncio.ensure_future(self.add(1, 2), loop=self.loop) + # Schedule the target + future = self.loop.run_in_executor(None, self.target) + # Set corrupted task factory + self.loop.set_task_factory(lambda loop, coro: wrong_name) + # Set exception handler + callback = test_utils.MockCallback() + self.loop.set_exception_handler(callback) + # Run event loop + with self.assertRaises(NameError) as exc_context: + self.loop.run_until_complete(future) + # Check exceptions + self.assertIn('wrong_name', exc_context.exception.args[0]) + self.assertEqual(len(callback.call_args_list), 1) + (loop, context), kwargs = callback.call_args + self.assertEqual(context['exception'], exc_context.exception) + if __name__ == '__main__': unittest.main()