diff -r 81c73f964066 Lib/asyncio/base_events.py --- a/Lib/asyncio/base_events.py Wed Dec 17 14:56:47 2014 +0200 +++ b/Lib/asyncio/base_events.py Wed Dec 17 23:55:22 2014 +0100 @@ -22,6 +22,7 @@ import logging import os import socket import subprocess +import threading import time import traceback import sys @@ -168,7 +169,9 @@ class BaseEventLoop(events.AbstractEvent self._scheduled = [] self._default_executor = None self._internal_fds = 0 - self._running = False + # Identifier of the thread running the event loop, or None if the + # event loop is not running + self._owner = None self._clock_resolution = time.get_clock_info('monotonic').resolution self._exception_handler = None self._debug = (not sys.flags.ignore_environment @@ -246,9 +249,9 @@ class BaseEventLoop(events.AbstractEvent def run_forever(self): """Run until stop() is called.""" self._check_closed() - if self._running: + if self.is_running(): raise RuntimeError('Event loop is running.') - self._running = True + self._owner = threading.get_ident() try: while True: try: @@ -256,7 +259,7 @@ class BaseEventLoop(events.AbstractEvent except _StopError: break finally: - self._running = False + self._owner = None def run_until_complete(self, future): """Run until the Future is done. @@ -311,7 +314,7 @@ class BaseEventLoop(events.AbstractEvent The event loop must not be running. """ - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self._closed: return @@ -331,7 +334,7 @@ class BaseEventLoop(events.AbstractEvent def is_running(self): """Returns True if the event loop is running.""" - return self._running + return (self._owner is not None) def time(self): """Return the time according to the event loop's clock. @@ -373,7 +376,7 @@ class BaseEventLoop(events.AbstractEvent raise TypeError("coroutines cannot be used with call_at()") self._check_closed() if self._debug: - self._assert_is_current_event_loop() + self._check_thread() timer = events.TimerHandle(when, callback, args, self) if timer._source_traceback: del timer._source_traceback[-1] @@ -391,17 +394,17 @@ class BaseEventLoop(events.AbstractEvent Any positional arguments after the callback will be passed to the callback when it is called. """ - handle = self._call_soon(callback, args, check_loop=True) + handle = self._call_soon(callback, args, check_thread=True) if handle._source_traceback: del handle._source_traceback[-1] return handle - def _call_soon(self, callback, args, check_loop): + def _call_soon(self, callback, args, check_thread): if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with call_soon()") - if self._debug and check_loop: - self._assert_is_current_event_loop() + if self._debug and check_thread: + self._check_thread() self._check_closed() handle = events.Handle(callback, args, self) if handle._source_traceback: @@ -409,8 +412,8 @@ class BaseEventLoop(events.AbstractEvent self._ready.append(handle) return handle - def _assert_is_current_event_loop(self): - """Asserts that this event loop is the current event loop. + def _check_thread(self): + """Check that the current thread is the thread running the event loop. Non-thread-safe methods of this class make this assumption and will likely behave incorrectly when the assumption is violated. @@ -418,18 +421,17 @@ class BaseEventLoop(events.AbstractEvent Should only be called when (self._debug == True). The caller is responsible for checking this condition for performance reasons. """ - try: - current = events.get_event_loop() - except AssertionError: + if self._owner is None: return - if current is not self: + thread_id = threading.get_ident() + if thread_id != self._owner: raise RuntimeError( "Non-thread-safe operation invoked on an event loop other " "than the current one") def call_soon_threadsafe(self, callback, *args): """Like call_soon(), but thread-safe.""" - handle = self._call_soon(callback, args, check_loop=False) + handle = self._call_soon(callback, args, check_thread=False) if handle._source_traceback: del handle._source_traceback[-1] self._write_to_self() diff -r 81c73f964066 Lib/asyncio/proactor_events.py --- a/Lib/asyncio/proactor_events.py Wed Dec 17 14:56:47 2014 +0200 +++ b/Lib/asyncio/proactor_events.py Wed Dec 17 23:55:22 2014 +0100 @@ -383,7 +383,7 @@ class BaseProactorEventLoop(base_events. sock, protocol, waiter, extra) def close(self): - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self.is_closed(): return @@ -434,7 +434,7 @@ class BaseProactorEventLoop(base_events. self._internal_fds += 1 # don't check the current loop because _make_self_pipe() is called # from the event loop constructor - self._call_soon(self._loop_self_reading, (), check_loop=False) + self._call_soon(self._loop_self_reading, (), check_thread=False) def _loop_self_reading(self, f=None): try: diff -r 81c73f964066 Lib/asyncio/selector_events.py --- a/Lib/asyncio/selector_events.py Wed Dec 17 14:56:47 2014 +0200 +++ b/Lib/asyncio/selector_events.py Wed Dec 17 23:55:22 2014 +0100 @@ -68,7 +68,7 @@ class BaseSelectorEventLoop(base_events. address, waiter, extra) def close(self): - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self.is_closed(): return diff -r 81c73f964066 Lib/test/test_asyncio/test_base_events.py --- a/Lib/test/test_asyncio/test_base_events.py Wed Dec 17 14:56:47 2014 +0200 +++ b/Lib/test/test_asyncio/test_base_events.py Wed Dec 17 23:55:22 2014 +0100 @@ -5,6 +5,7 @@ import logging import math import socket import sys +import threading import time import unittest from unittest import mock @@ -144,28 +145,71 @@ class BaseEventLoopTests(test_utils.Test # are really slow self.assertLessEqual(dt, 0.9, dt) - def test_assert_is_current_event_loop(self): + def check_thread(self, loop, debug): def cb(): pass - other_loop = base_events.BaseEventLoop() - other_loop._selector = mock.Mock() - asyncio.set_event_loop(other_loop) + loop.set_debug(debug) + if debug: + msg = ("Non-thread-safe operation invoked on an event loop other " + "than the current one") + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_soon(cb) + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_later(60, cb) + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_at(loop.time() + 60, cb) + else: + loop.call_soon(cb) + loop.call_later(60, cb) + loop.call_at(loop.time() + 60, cb) - # raise RuntimeError if the event loop is different in debug mode - self.loop.set_debug(True) - with self.assertRaises(RuntimeError): - self.loop.call_soon(cb) - with self.assertRaises(RuntimeError): - self.loop.call_later(60, cb) - with self.assertRaises(RuntimeError): - self.loop.call_at(self.loop.time() + 60, cb) + def test_check_thread(self): + def check_in_thread(loop, event, debug, create_loop, fut): + # wait until the event loop is running + event.wait() + + try: + if create_loop: + loop2 = base_events.BaseEventLoop() + try: + asyncio.set_event_loop(loop2) + self.check_thread(loop, debug) + finally: + asyncio.set_event_loop(None) + loop2.close() + else: + self.check_thread(loop, debug) + except Exception as exc: + loop.call_soon_threadsafe(fut.set_exception, exc) + else: + loop.call_soon_threadsafe(fut.set_result, None) + + def test_thread(loop, debug, create_loop=False): + event = threading.Event() + fut = asyncio.Future(loop=loop) + loop.call_soon(event.set) + args = (loop, event, debug, create_loop, fut) + thread = threading.Thread(target=check_in_thread, args=args) + thread.start() + loop.run_until_complete(fut) + thread.join() + + self.loop._process_events = mock.Mock() + self.loop._write_to_self = mock.Mock() + + # raise RuntimeError if the thread has no event loop + test_thread(self.loop, True) # check disabled if debug mode is disabled - self.loop.set_debug(False) - self.loop.call_soon(cb) - self.loop.call_later(60, cb) - self.loop.call_at(self.loop.time() + 60, cb) + test_thread(self.loop, False) + + # raise RuntimeError if the event loop of the thread is not the called + # event loop + test_thread(self.loop, True, create_loop=True) + + # check disabled if debug mode is disabled + test_thread(self.loop, False, create_loop=True) def test_run_once_in_executor_handle(self): def cb(): diff -r 81c73f964066 Lib/test/test_asyncio/test_proactor_events.py --- a/Lib/test/test_asyncio/test_proactor_events.py Wed Dec 17 14:56:47 2014 +0200 +++ b/Lib/test/test_asyncio/test_proactor_events.py Wed Dec 17 23:55:22 2014 +0100 @@ -450,7 +450,7 @@ class BaseProactorEventLoopTests(test_ut self.assertIs(loop._csock, csock) self.assertEqual(loop._internal_fds, 1) _call_soon.assert_called_with(loop._loop_self_reading, (), - check_loop=False) + check_thread=False) def test_close_self_pipe(self): self.loop._close_self_pipe() diff -r 81c73f964066 Lib/test/test_asyncio/test_subprocess.py --- a/Lib/test/test_asyncio/test_subprocess.py Wed Dec 17 14:56:47 2014 +0200 +++ b/Lib/test/test_asyncio/test_subprocess.py Wed Dec 17 23:55:22 2014 +0100 @@ -229,19 +229,12 @@ if sys.platform != 'win32': def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() - - # ensure that the event loop is passed explicitly in asyncio - policy.set_event_loop(None) + self.set_event_loop(self.loop) watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) - - def tearDown(self): - policy = asyncio.get_event_loop_policy() - policy.set_child_watcher(None) - self.loop.close() - super().tearDown() + self.addCleanup(policy.set_child_watcher, None) class SubprocessSafeWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): @@ -258,17 +251,8 @@ else: class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): def setUp(self): - policy = asyncio.get_event_loop_policy() self.loop = asyncio.ProactorEventLoop() - - # ensure that the event loop is passed explicitly in asyncio - policy.set_event_loop(None) - - def tearDown(self): - policy = asyncio.get_event_loop_policy() - self.loop.close() - policy.set_event_loop(None) - super().tearDown() + self.set_event_loop(self.loop) if __name__ == '__main__':