diff -r 4d3066d4a5df Lib/asyncio/base_events.py --- a/Lib/asyncio/base_events.py Sat Dec 20 20:58:28 2014 +0200 +++ b/Lib/asyncio/base_events.py Sun Dec 21 00:18:47 2014 +0100 @@ -22,6 +22,7 @@ import logging import os import socket import subprocess +import threading import time import traceback import sys @@ -168,7 +169,9 @@ class BaseEventLoop(events.AbstractEvent self._scheduled = [] self._default_executor = None self._internal_fds = 0 - self._running = False + # Identifier of the thread running the event loop, or None if the + # event loop is not running + self._owner = None self._clock_resolution = time.get_clock_info('monotonic').resolution self._exception_handler = None self._debug = (not sys.flags.ignore_environment @@ -246,9 +249,9 @@ class BaseEventLoop(events.AbstractEvent def run_forever(self): """Run until stop() is called.""" self._check_closed() - if self._running: + if self.is_running(): raise RuntimeError('Event loop is running.') - self._running = True + self._owner = threading.get_ident() try: while True: try: @@ -256,7 +259,7 @@ class BaseEventLoop(events.AbstractEvent except _StopError: break finally: - self._running = False + self._owner = None def run_until_complete(self, future): """Run until the Future is done. @@ -311,7 +314,7 @@ class BaseEventLoop(events.AbstractEvent The event loop must not be running. """ - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self._closed: return @@ -331,7 +334,7 @@ class BaseEventLoop(events.AbstractEvent def is_running(self): """Returns True if the event loop is running.""" - return self._running + return (self._owner is not None) def time(self): """Return the time according to the event loop's clock. @@ -372,8 +375,9 @@ class BaseEventLoop(events.AbstractEvent or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with call_at()") self._check_closed() - if self._debug: - self._assert_is_current_event_loop() + if self._owner is not None and threading.get_ident() != self._owner: + raise RuntimeError("call_at() must only be called from " + "the thread running the event loop") timer = events.TimerHandle(when, callback, args, self) if timer._source_traceback: del timer._source_traceback[-1] @@ -391,17 +395,18 @@ class BaseEventLoop(events.AbstractEvent Any positional arguments after the callback will be passed to the callback when it is called. """ - handle = self._call_soon(callback, args, check_loop=True) + if self._owner is not None and threading.get_ident() != self._owner: + raise RuntimeError("call_soon() must only be called from " + "the thread running the event loop") + handle = self._call_soon(callback, args) if handle._source_traceback: del handle._source_traceback[-1] return handle - def _call_soon(self, callback, args, check_loop): + def _call_soon(self, callback, args): if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with call_soon()") - if self._debug and check_loop: - self._assert_is_current_event_loop() self._check_closed() handle = events.Handle(callback, args, self) if handle._source_traceback: @@ -409,27 +414,9 @@ class BaseEventLoop(events.AbstractEvent self._ready.append(handle) return handle - def _assert_is_current_event_loop(self): - """Asserts that this event loop is the current event loop. - - Non-thread-safe methods of this class make this assumption and will - likely behave incorrectly when the assumption is violated. - - Should only be called when (self._debug == True). The caller is - responsible for checking this condition for performance reasons. - """ - try: - current = events.get_event_loop() - except RuntimeError: - return - if current is not self: - raise RuntimeError( - "Non-thread-safe operation invoked on an event loop other " - "than the current one") - def call_soon_threadsafe(self, callback, *args): """Like call_soon(), but thread-safe.""" - handle = self._call_soon(callback, args, check_loop=False) + handle = self._call_soon(callback, args) if handle._source_traceback: del handle._source_traceback[-1] self._write_to_self() diff -r 4d3066d4a5df Lib/asyncio/proactor_events.py --- a/Lib/asyncio/proactor_events.py Sat Dec 20 20:58:28 2014 +0200 +++ b/Lib/asyncio/proactor_events.py Sun Dec 21 00:18:47 2014 +0100 @@ -383,7 +383,7 @@ class BaseProactorEventLoop(base_events. sock, protocol, waiter, extra) def close(self): - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self.is_closed(): return @@ -432,9 +432,7 @@ class BaseProactorEventLoop(base_events. self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 - # don't check the current loop because _make_self_pipe() is called - # from the event loop constructor - self._call_soon(self._loop_self_reading, (), check_loop=False) + self._call_soon(self._loop_self_reading, ()) def _loop_self_reading(self, f=None): try: diff -r 4d3066d4a5df Lib/asyncio/selector_events.py --- a/Lib/asyncio/selector_events.py Sat Dec 20 20:58:28 2014 +0200 +++ b/Lib/asyncio/selector_events.py Sun Dec 21 00:18:47 2014 +0100 @@ -68,7 +68,7 @@ class BaseSelectorEventLoop(base_events. address, waiter, extra) def close(self): - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self.is_closed(): return diff -r 4d3066d4a5df Lib/test/test_asyncio/test_base_events.py --- a/Lib/test/test_asyncio/test_base_events.py Sat Dec 20 20:58:28 2014 +0200 +++ b/Lib/test/test_asyncio/test_base_events.py Sun Dec 21 00:18:47 2014 +0100 @@ -5,6 +5,7 @@ import logging import math import socket import sys +import threading import time import unittest from unittest import mock @@ -148,28 +149,64 @@ class BaseEventLoopTests(test_utils.Test # are really slow self.assertLessEqual(dt, 0.9, dt) - def test_assert_is_current_event_loop(self): + def check_thread(self, loop, debug): def cb(): pass - other_loop = base_events.BaseEventLoop() - other_loop._selector = mock.Mock() - asyncio.set_event_loop(other_loop) + loop.set_debug(debug) + msg = (r"call_soon\(\) must only be called from the thread " + "running the event loop") + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_soon(cb) - # raise RuntimeError if the event loop is different in debug mode - self.loop.set_debug(True) - with self.assertRaises(RuntimeError): - self.loop.call_soon(cb) - with self.assertRaises(RuntimeError): - self.loop.call_later(60, cb) - with self.assertRaises(RuntimeError): - self.loop.call_at(self.loop.time() + 60, cb) + msg = (r"call_at\(\) must only be called from the thread " + "running the event loop") + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_later(60, cb) + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_at(loop.time() + 60, cb) - # check disabled if debug mode is disabled - self.loop.set_debug(False) - self.loop.call_soon(cb) - self.loop.call_later(60, cb) - self.loop.call_at(self.loop.time() + 60, cb) + def test_check_thread(self): + def check_in_thread(loop, event, debug, create_loop, fut): + # wait until the event loop is running + event.wait() + + try: + if create_loop: + loop2 = base_events.BaseEventLoop() + try: + asyncio.set_event_loop(loop2) + self.check_thread(loop, debug) + finally: + asyncio.set_event_loop(None) + loop2.close() + else: + self.check_thread(loop, debug) + except Exception as exc: + loop.call_soon_threadsafe(fut.set_exception, exc) + else: + loop.call_soon_threadsafe(fut.set_result, None) + + def test_thread(loop, debug, create_loop=False): + event = threading.Event() + fut = asyncio.Future(loop=loop) + loop.call_soon(event.set) + args = (loop, event, debug, create_loop, fut) + thread = threading.Thread(target=check_in_thread, args=args) + thread.start() + loop.run_until_complete(fut) + thread.join() + + self.loop._process_events = mock.Mock() + self.loop._write_to_self = mock.Mock() + + # thread without event loop + test_thread(self.loop, True) + test_thread(self.loop, False) + + # thread with an event loop + test_thread(self.loop, True, create_loop=True) + test_thread(self.loop, False, create_loop=True) def test_run_once_in_executor_handle(self): def cb(): diff -r 4d3066d4a5df Lib/test/test_asyncio/test_proactor_events.py --- a/Lib/test/test_asyncio/test_proactor_events.py Sat Dec 20 20:58:28 2014 +0200 +++ b/Lib/test/test_asyncio/test_proactor_events.py Sun Dec 21 00:18:47 2014 +0100 @@ -449,8 +449,7 @@ class BaseProactorEventLoopTests(test_ut self.assertIs(loop._ssock, ssock) self.assertIs(loop._csock, csock) self.assertEqual(loop._internal_fds, 1) - _call_soon.assert_called_with(loop._loop_self_reading, (), - check_loop=False) + _call_soon.assert_called_with(loop._loop_self_reading, ()) def test_close_self_pipe(self): self.loop._close_self_pipe() diff -r 4d3066d4a5df Lib/test/test_asyncio/test_subprocess.py --- a/Lib/test/test_asyncio/test_subprocess.py Sat Dec 20 20:58:28 2014 +0200 +++ b/Lib/test/test_asyncio/test_subprocess.py Sun Dec 21 00:18:47 2014 +0100 @@ -233,19 +233,12 @@ if sys.platform != 'win32': def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() - - # ensure that the event loop is passed explicitly in asyncio - policy.set_event_loop(None) + self.set_event_loop(self.loop) watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) - - def tearDown(self): - policy = asyncio.get_event_loop_policy() - policy.set_child_watcher(None) - self.loop.close() - super().tearDown() + self.addCleanup(policy.set_child_watcher, None) class SubprocessSafeWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): @@ -262,17 +255,8 @@ else: class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): def setUp(self): - policy = asyncio.get_event_loop_policy() self.loop = asyncio.ProactorEventLoop() - - # ensure that the event loop is passed explicitly in asyncio - policy.set_event_loop(None) - - def tearDown(self): - policy = asyncio.get_event_loop_policy() - self.loop.close() - policy.set_event_loop(None) - super().tearDown() + self.set_event_loop(self.loop) if __name__ == '__main__':