diff -r deb3e5857d8c Lib/asyncio/base_events.py --- a/Lib/asyncio/base_events.py Thu Oct 27 19:30:10 2016 +0900 +++ b/Lib/asyncio/base_events.py Fri Oct 28 00:12:34 2016 -0400 @@ -57,7 +57,7 @@ def _format_handle(handle): cb = handle._callback - if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task): + if isinstance(getattr(cb, '__self__', None), tasks.Task): # format the task return repr(cb.__self__) else: diff -r deb3e5857d8c Lib/asyncio/base_futures.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/asyncio/base_futures.py Fri Oct 28 00:12:34 2016 -0400 @@ -0,0 +1,70 @@ +__all__ = [] + +import concurrent.futures._base +import reprlib + +from . import events + +Error = concurrent.futures._base.Error +CancelledError = concurrent.futures.CancelledError +TimeoutError = concurrent.futures.TimeoutError + + +class InvalidStateError(Error): + """The operation is not allowed in this state.""" + + +# States for Future. +_PENDING = 'PENDING' +_CANCELLED = 'CANCELLED' +_FINISHED = 'FINISHED' + + +def isfuture(obj): + """Check for a Future. + + This returns True when obj is a Future instance or is advertising + itself as duck-type compatible by setting _asyncio_future_blocking. + See comment in Future for more details. + """ + return getattr(obj, '_asyncio_future_blocking', None) is not None + + +def _format_callbacks(cb): + """helper function for Future.__repr__""" + size = len(cb) + if not size: + cb = '' + + def format_cb(callback): + return events._format_callback_source(callback, ()) + + if size == 1: + cb = format_cb(cb[0]) + elif size == 2: + cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1])) + elif size > 2: + cb = '{}, <{} more>, {}'.format(format_cb(cb[0]), + size - 2, + format_cb(cb[-1])) + return 'cb=[%s]' % cb + + +def _future_repr_info(future): + # (Future) -> str + """helper function for Future.__repr__""" + info = [future._state.lower()] + if future._state == _FINISHED: + if future._exception is not None: + info.append('exception={!r}'.format(future._exception)) + else: + # use reprlib to limit the length of the output, especially + # for very long strings + result = reprlib.repr(future._result) + info.append('result={}'.format(result)) + if future._callbacks: + info.append(_format_callbacks(future._callbacks)) + if future._source_traceback: + frame = future._source_traceback[-1] + info.append('created at %s:%s' % (frame[0], frame[1])) + return info diff -r deb3e5857d8c Lib/asyncio/base_tasks.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/asyncio/base_tasks.py Fri Oct 28 00:12:34 2016 -0400 @@ -0,0 +1,76 @@ +import linecache +import traceback + +from . import base_futures +from . import coroutines + + +def _task_repr_info(task): + info = base_futures._future_repr_info(task) + + if task._must_cancel: + # replace status + info[0] = 'cancelling' + + coro = coroutines._format_coroutine(task._coro) + info.insert(1, 'coro=<%s>' % coro) + + if task._fut_waiter is not None: + info.insert(2, 'wait_for=%r' % task._fut_waiter) + return info + + +def _task_get_stack(task, limit): + frames = [] + try: + # 'async def' coroutines + f = task._coro.cr_frame + except AttributeError: + f = task._coro.gi_frame + if f is not None: + while f is not None: + if limit is not None: + if limit <= 0: + break + limit -= 1 + frames.append(f) + f = f.f_back + frames.reverse() + elif task._exception is not None: + tb = task._exception.__traceback__ + while tb is not None: + if limit is not None: + if limit <= 0: + break + limit -= 1 + frames.append(tb.tb_frame) + tb = tb.tb_next + return frames + + +def _task_print_stack(task, limit, file): + extracted_list = [] + checked = set() + for f in task.get_stack(limit=limit): + lineno = f.f_lineno + co = f.f_code + filename = co.co_filename + name = co.co_name + if filename not in checked: + checked.add(filename) + linecache.checkcache(filename) + line = linecache.getline(filename, lineno, f.f_globals) + extracted_list.append((filename, lineno, name, line)) + exc = task._exception + if not extracted_list: + print('No stack for %r' % task, file=file) + elif exc is not None: + print('Traceback for %r (most recent call last):' % task, + file=file) + else: + print('Stack for %r (most recent call last):' % task, + file=file) + traceback.print_list(extracted_list, file=file) + if exc is not None: + for line in traceback.format_exception_only(exc.__class__, exc): + print(line, file=file, end='') diff -r deb3e5857d8c Lib/asyncio/coroutines.py --- a/Lib/asyncio/coroutines.py Thu Oct 27 19:30:10 2016 +0900 +++ b/Lib/asyncio/coroutines.py Fri Oct 28 00:12:34 2016 -0400 @@ -11,7 +11,7 @@ from . import compat from . import events -from . import futures +from . import base_futures from .log import logger @@ -204,7 +204,7 @@ @functools.wraps(func) def coro(*args, **kw): res = func(*args, **kw) - if (futures.isfuture(res) or inspect.isgenerator(res) or + if (base_futures.isfuture(res) or inspect.isgenerator(res) or isinstance(res, CoroWrapper)): res = yield from res elif _AwaitableABC is not None: diff -r deb3e5857d8c Lib/asyncio/futures.py --- a/Lib/asyncio/futures.py Thu Oct 27 19:30:10 2016 +0900 +++ b/Lib/asyncio/futures.py Fri Oct 28 00:12:34 2016 -0400 @@ -1,35 +1,32 @@ """A Future class similar to the one in PEP 3148.""" -__all__ = ['CancelledError', 'TimeoutError', - 'InvalidStateError', - 'Future', 'wrap_future', 'isfuture' - ] +__all__ = ['CancelledError', 'TimeoutError', 'InvalidStateError', + 'Future', 'wrap_future', 'isfuture'] -import concurrent.futures._base +import concurrent.futures import logging -import reprlib import sys import traceback +from . import base_futures from . import compat from . import events -# States for Future. -_PENDING = 'PENDING' -_CANCELLED = 'CANCELLED' -_FINISHED = 'FINISHED' -Error = concurrent.futures._base.Error -CancelledError = concurrent.futures.CancelledError -TimeoutError = concurrent.futures.TimeoutError +CancelledError = base_futures.CancelledError +InvalidStateError = base_futures.InvalidStateError +TimeoutError = base_futures.TimeoutError +isfuture = base_futures.isfuture + + +_PENDING = base_futures._PENDING +_CANCELLED = base_futures._CANCELLED +_FINISHED = base_futures._FINISHED + STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging -class InvalidStateError(Error): - """The operation is not allowed in this state.""" - - class _TracebackLogger: """Helper to log a traceback upon destruction if not cleared. @@ -110,56 +107,6 @@ self.loop.call_exception_handler({'message': msg}) -def isfuture(obj): - """Check for a Future. - - This returns True when obj is a Future instance or is advertising - itself as duck-type compatible by setting _asyncio_future_blocking. - See comment in Future for more details. - """ - return getattr(obj, '_asyncio_future_blocking', None) is not None - - -def _format_callbacks(cb): - """helper function for Future.__repr__""" - size = len(cb) - if not size: - cb = '' - - def format_cb(callback): - return events._format_callback_source(callback, ()) - - if size == 1: - cb = format_cb(cb[0]) - elif size == 2: - cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1])) - elif size > 2: - cb = '{}, <{} more>, {}'.format(format_cb(cb[0]), - size-2, - format_cb(cb[-1])) - return 'cb=[%s]' % cb - - -def _future_repr_info(future): - # (Future) -> str - """helper function for Future.__repr__""" - info = [future._state.lower()] - if future._state == _FINISHED: - if future._exception is not None: - info.append('exception={!r}'.format(future._exception)) - else: - # use reprlib to limit the length of the output, especially - # for very long strings - result = reprlib.repr(future._result) - info.append('result={}'.format(result)) - if future._callbacks: - info.append(_format_callbacks(future._callbacks)) - if future._source_traceback: - frame = future._source_traceback[-1] - info.append('created at %s:%s' % (frame[0], frame[1])) - return info - - class Future: """This class is *almost* compatible with concurrent.futures.Future. @@ -212,7 +159,7 @@ if self._loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) - _repr_info = _future_repr_info + _repr_info = base_futures._future_repr_info def __repr__(self): return '<%s %s>' % (self.__class__.__name__, ' '.join(self._repr_info())) diff -r deb3e5857d8c Lib/asyncio/tasks.py --- a/Lib/asyncio/tasks.py Thu Oct 27 19:30:10 2016 +0900 +++ b/Lib/asyncio/tasks.py Fri Oct 28 00:12:34 2016 -0400 @@ -9,11 +9,10 @@ import concurrent.futures import functools import inspect -import linecache -import traceback import warnings import weakref +from . import base_tasks from . import compat from . import coroutines from . import events @@ -93,18 +92,7 @@ futures.Future.__del__(self) def _repr_info(self): - info = super()._repr_info() - - if self._must_cancel: - # replace status - info[0] = 'cancelling' - - coro = coroutines._format_coroutine(self._coro) - info.insert(1, 'coro=<%s>' % coro) - - if self._fut_waiter is not None: - info.insert(2, 'wait_for=%r' % self._fut_waiter) - return info + return base_tasks._task_repr_info(self) def get_stack(self, *, limit=None): """Return the list of stack frames for this task's coroutine. @@ -127,31 +115,7 @@ For reasons beyond our control, only one stack frame is returned for a suspended coroutine. """ - frames = [] - try: - # 'async def' coroutines - f = self._coro.cr_frame - except AttributeError: - f = self._coro.gi_frame - if f is not None: - while f is not None: - if limit is not None: - if limit <= 0: - break - limit -= 1 - frames.append(f) - f = f.f_back - frames.reverse() - elif self._exception is not None: - tb = self._exception.__traceback__ - while tb is not None: - if limit is not None: - if limit <= 0: - break - limit -= 1 - frames.append(tb.tb_frame) - tb = tb.tb_next - return frames + return base_tasks._task_get_stack(self, limit) def print_stack(self, *, limit=None, file=None): """Print the stack or traceback for this task's coroutine. @@ -162,31 +126,7 @@ to which the output is written; by default output is written to sys.stderr. """ - extracted_list = [] - checked = set() - for f in self.get_stack(limit=limit): - lineno = f.f_lineno - co = f.f_code - filename = co.co_filename - name = co.co_name - if filename not in checked: - checked.add(filename) - linecache.checkcache(filename) - line = linecache.getline(filename, lineno, f.f_globals) - extracted_list.append((filename, lineno, name, line)) - exc = self._exception - if not extracted_list: - print('No stack for %r' % self, file=file) - elif exc is not None: - print('Traceback for %r (most recent call last):' % self, - file=file) - else: - print('Stack for %r (most recent call last):' % self, - file=file) - traceback.print_list(extracted_list, file=file) - if exc is not None: - for line in traceback.format_exception_only(exc.__class__, exc): - print(line, file=file, end='') + return base_tasks._task_print_stack(self, limit, file) def cancel(self): """Request that this task cancel itself. @@ -316,6 +256,18 @@ self = None # Needed to break cycles when an exception occurs. +_PyTask = Task + + +try: + import _asyncio +except ImportError: + pass +else: + # _CTask is needed for tests. + Task = _CTask = _asyncio.Task + + # wait() and as_completed() similar to those in PEP 3148. FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED diff -r deb3e5857d8c Lib/test/test_asyncio/test_tasks.py --- a/Lib/test/test_asyncio/test_tasks.py Thu Oct 27 19:30:10 2016 +0900 +++ b/Lib/test/test_asyncio/test_tasks.py Fri Oct 28 00:12:34 2016 -0400 @@ -14,6 +14,8 @@ import asyncio from asyncio import coroutines +from asyncio import futures +from asyncio import tasks from asyncio import test_utils try: from test import support @@ -72,14 +74,25 @@ pass -class TaskTests(test_utils.TestCase): +class BaseTaskTests: + + Task = None + Future = None + + def new_task(self, loop, coro): + return self.__class__.Task(coro, loop=loop) + + def new_future(self, loop): + return self.__class__.Future(loop=loop) def setUp(self): self.loop = self.new_test_loop() + self.loop.set_task_factory(self.new_task) + self.loop.create_future = lambda: self.new_future(self.loop) def test_other_loop_future(self): other_loop = asyncio.new_event_loop() - fut = asyncio.Future(loop=other_loop) + fut = self.new_future(other_loop) @asyncio.coroutine def run(fut): @@ -107,7 +120,7 @@ @asyncio.coroutine def notmuch(): return 'ok' - t = asyncio.Task(notmuch(), loop=self.loop) + t = self.new_task(self.loop, notmuch()) self.loop.run_until_complete(t) self.assertTrue(t.done()) self.assertEqual(t.result(), 'ok') @@ -115,7 +128,7 @@ loop = asyncio.new_event_loop() self.set_event_loop(loop) - t = asyncio.Task(notmuch(), loop=loop) + t = self.new_task(loop, notmuch()) self.assertIs(t._loop, loop) loop.run_until_complete(t) loop.close() @@ -138,7 +151,7 @@ loop.close() def test_ensure_future_future(self): - f_orig = asyncio.Future(loop=self.loop) + f_orig = self.new_future(self.loop) f_orig.set_result('ko') f = asyncio.ensure_future(f_orig) @@ -162,7 +175,7 @@ @asyncio.coroutine def notmuch(): return 'ok' - t_orig = asyncio.Task(notmuch(), loop=self.loop) + t_orig = self.new_task(self.loop, notmuch()) t = asyncio.ensure_future(t_orig) self.loop.run_until_complete(t) self.assertTrue(t.done()) @@ -203,7 +216,7 @@ asyncio.ensure_future('ok') def test_async_warning(self): - f = asyncio.Future(loop=self.loop) + f = self.new_future(self.loop) with self.assertWarnsRegex(DeprecationWarning, 'function is deprecated, use ensure_'): self.assertIs(f, asyncio.async(f)) @@ -250,8 +263,8 @@ # test coroutine function self.assertEqual(notmuch.__name__, 'notmuch') if PY35: - self.assertEqual(notmuch.__qualname__, - 'TaskTests.test_task_repr..notmuch') + self.assertRegex(notmuch.__qualname__, + r'\w+.test_task_repr..notmuch') self.assertEqual(notmuch.__module__, __name__) filename, lineno = test_utils.get_function_source(notmuch) @@ -260,7 +273,7 @@ # test coroutine object gen = notmuch() if coroutines._DEBUG or PY35: - coro_qualname = 'TaskTests.test_task_repr..notmuch' + coro_qualname = 'BaseTaskTests.test_task_repr..notmuch' else: coro_qualname = 'notmuch' self.assertEqual(gen.__name__, 'notmuch') @@ -269,7 +282,7 @@ coro_qualname) # test pending Task - t = asyncio.Task(gen, loop=self.loop) + t = self.new_task(self.loop, gen) t.add_done_callback(Dummy()) coro = format_coroutine(coro_qualname, 'running', src, @@ -291,7 +304,7 @@ '' % coro) # test finished Task - t = asyncio.Task(notmuch(), loop=self.loop) + t = self.new_task(self.loop, notmuch()) self.loop.run_until_complete(t) coro = format_coroutine(coro_qualname, 'done', src, t._source_traceback) @@ -310,9 +323,9 @@ # test coroutine function self.assertEqual(notmuch.__name__, 'notmuch') if PY35: - self.assertEqual(notmuch.__qualname__, - 'TaskTests.test_task_repr_coro_decorator' - '..notmuch') + self.assertRegex(notmuch.__qualname__, + r'\w+.test_task_repr_coro_decorator' + r'\.\.notmuch') self.assertEqual(notmuch.__module__, __name__) # test coroutine object @@ -322,7 +335,7 @@ # function, as expected, and have a qualified name (__qualname__ # attribute). coro_name = 'notmuch' - coro_qualname = ('TaskTests.test_task_repr_coro_decorator' + coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' '..notmuch') else: # On Python < 3.5, generators inherit the name of the code, not of @@ -350,7 +363,7 @@ self.assertEqual(repr(gen), '' % coro) # test pending Task - t = asyncio.Task(gen, loop=self.loop) + t = self.new_task(self.loop, gen) t.add_done_callback(Dummy()) # format the coroutine object @@ -373,8 +386,8 @@ def wait_for(fut): return (yield from fut) - fut = asyncio.Future(loop=self.loop) - task = asyncio.Task(wait_for(fut), loop=self.loop) + fut = self.new_future(self.loop) + task = self.new_task(self.loop, wait_for(fut)) test_utils.run_briefly(self.loop) self.assertRegex(repr(task), '' % re.escape(repr(fut))) @@ -400,10 +413,11 @@ self.addCleanup(task._coro.close) coro_repr = repr(task._coro) - expected = ('.func(1)() running, ') - self.assertTrue(coro_repr.startswith(expected), - coro_repr) + expected = ( + r'\.func\(1\)\(\) running, ' + ) + self.assertRegex(coro_repr, expected) def test_task_basics(self): @asyncio.coroutine @@ -437,7 +451,7 @@ yield from asyncio.sleep(10.0, loop=loop) return 12 - t = asyncio.Task(task(), loop=loop) + t = self.new_task(loop, task()) loop.call_soon(t.cancel) with self.assertRaises(asyncio.CancelledError): loop.run_until_complete(t) @@ -452,7 +466,7 @@ yield return 12 - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) # start coro t.cancel() self.assertRaises( @@ -462,14 +476,14 @@ self.assertFalse(t.cancel()) def test_cancel_inner_future(self): - f = asyncio.Future(loop=self.loop) + f = self.new_future(self.loop) @asyncio.coroutine def task(): yield from f return 12 - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) # start task f.cancel() with self.assertRaises(asyncio.CancelledError): @@ -478,14 +492,14 @@ self.assertTrue(t.cancelled()) def test_cancel_both_task_and_inner_future(self): - f = asyncio.Future(loop=self.loop) + f = self.new_future(self.loop) @asyncio.coroutine def task(): yield from f return 12 - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) f.cancel() @@ -499,8 +513,8 @@ self.assertTrue(t.cancelled()) def test_cancel_task_catching(self): - fut1 = asyncio.Future(loop=self.loop) - fut2 = asyncio.Future(loop=self.loop) + fut1 = self.new_future(self.loop) + fut2 = self.new_future(self.loop) @asyncio.coroutine def task(): @@ -510,7 +524,7 @@ except asyncio.CancelledError: return 42 - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut1) # White-box test. fut1.set_result(None) @@ -523,9 +537,9 @@ self.assertFalse(t.cancelled()) def test_cancel_task_ignoring(self): - fut1 = asyncio.Future(loop=self.loop) - fut2 = asyncio.Future(loop=self.loop) - fut3 = asyncio.Future(loop=self.loop) + fut1 = self.new_future(self.loop) + fut2 = self.new_future(self.loop) + fut3 = self.new_future(self.loop) @asyncio.coroutine def task(): @@ -537,7 +551,7 @@ res = yield from fut3 return res - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut1) # White-box test. fut1.set_result(None) @@ -565,7 +579,7 @@ yield from asyncio.sleep(100, loop=loop) return 12 - t = asyncio.Task(task(), loop=loop) + t = self.new_task(loop, task()) self.assertRaises( asyncio.CancelledError, loop.run_until_complete, t) self.assertTrue(t.done()) @@ -598,7 +612,7 @@ if x == 2: loop.stop() - t = asyncio.Task(task(), loop=loop) + t = self.new_task(loop, task()) with self.assertRaises(RuntimeError) as cm: loop.run_until_complete(t) self.assertEqual(str(cm.exception), @@ -636,7 +650,7 @@ foo_running = False return 'done' - fut = asyncio.Task(foo(), loop=loop) + fut = self.new_task(loop, foo()) with self.assertRaises(asyncio.TimeoutError): loop.run_until_complete(asyncio.wait_for(fut, 0.1, loop=loop)) @@ -676,7 +690,7 @@ asyncio.set_event_loop(loop) try: - fut = asyncio.Task(foo(), loop=loop) + fut = self.new_task(loop, foo()) with self.assertRaises(asyncio.TimeoutError): loop.run_until_complete(asyncio.wait_for(fut, 0.01)) finally: @@ -695,7 +709,7 @@ loop = self.new_test_loop(gen) - fut = asyncio.Future(loop=loop) + fut = self.new_future(loop) task = asyncio.wait_for(fut, timeout=0.2, loop=loop) loop.call_later(0.1, fut.set_result, "ok") res = loop.run_until_complete(task) @@ -712,8 +726,8 @@ loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) @asyncio.coroutine def foo(): @@ -722,12 +736,12 @@ self.assertEqual(pending, set()) return 42 - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertEqual(res, 42) self.assertAlmostEqual(0.15, loop.time()) # Doing it again should take no time and exercise a different path. - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) self.assertEqual(res, 42) @@ -742,8 +756,8 @@ loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.01, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.015, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.01, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.015, loop=loop)) @asyncio.coroutine def foo(): @@ -754,7 +768,7 @@ asyncio.set_event_loop(loop) res = loop.run_until_complete( - asyncio.Task(foo(), loop=loop)) + self.new_task(loop, foo())) self.assertEqual(res, 42) @@ -764,9 +778,9 @@ return s c = coro('test') - task = asyncio.Task( - asyncio.wait([c, c, coro('spam')], loop=self.loop), - loop=self.loop) + task =self.new_task( + self.loop, + asyncio.wait([c, c, coro('spam')], loop=self.loop)) done, pending = self.loop.run_until_complete(task) @@ -797,12 +811,12 @@ loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) - task = asyncio.Task( + a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) + task = self.new_task( + loop, asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED, - loop=loop), - loop=loop) + loop=loop)) done, pending = loop.run_until_complete(task) self.assertEqual({b}, done) @@ -829,12 +843,12 @@ yield yield - a = asyncio.Task(coro1(), loop=self.loop) - b = asyncio.Task(coro2(), loop=self.loop) - task = asyncio.Task( + a = self.new_task(self.loop, coro1()) + b = self.new_task(self.loop, coro2()) + task = self.new_task( + self.loop, asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED, - loop=self.loop), - loop=self.loop) + loop=self.loop)) done, pending = self.loop.run_until_complete(task) self.assertEqual({a, b}, done) @@ -853,17 +867,17 @@ loop = self.new_test_loop(gen) # first_exception, task already has exception - a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) @asyncio.coroutine def exc(): raise ZeroDivisionError('err') - b = asyncio.Task(exc(), loop=loop) - task = asyncio.Task( + b = self.new_task(loop, exc()) + task = self.new_task( + loop, asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION, - loop=loop), - loop=loop) + loop=loop)) done, pending = loop.run_until_complete(task) self.assertEqual({b}, done) @@ -886,14 +900,14 @@ loop = self.new_test_loop(gen) # first_exception, exception during waiting - a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) @asyncio.coroutine def exc(): yield from asyncio.sleep(0.01, loop=loop) raise ZeroDivisionError('err') - b = asyncio.Task(exc(), loop=loop) + b = self.new_task(loop, exc()) task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION, loop=loop) @@ -917,14 +931,14 @@ loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) @asyncio.coroutine def sleeper(): yield from asyncio.sleep(0.15, loop=loop) raise ZeroDivisionError('really') - b = asyncio.Task(sleeper(), loop=loop) + b = self.new_task(loop, sleeper()) @asyncio.coroutine def foo(): @@ -934,10 +948,10 @@ errors = set(f for f in done if f.exception() is not None) self.assertEqual(len(errors), 1) - loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) - loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) def test_wait_with_timeout(self): @@ -953,8 +967,8 @@ loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) @asyncio.coroutine def foo(): @@ -963,7 +977,7 @@ self.assertEqual(done, set([a])) self.assertEqual(pending, set([b])) - loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.11, loop.time()) # move forward to close generator @@ -983,8 +997,8 @@ loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) done, pending = loop.run_until_complete( asyncio.wait([b, a], timeout=0.1, loop=loop)) @@ -1032,14 +1046,14 @@ values.append((yield from f)) return values - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) self.assertTrue('a' in res[:2]) self.assertTrue('b' in res[:2]) self.assertEqual(res[2], 'c') # Doing it again should take no time and exercise a different path. - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) def test_as_completed_with_timeout(self): @@ -1068,7 +1082,7 @@ values.append((2, exc)) return values - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertEqual(len(res), 2, res) self.assertEqual(res[0], (1, 'a')) self.assertEqual(res[1][0], 2) @@ -1096,7 +1110,7 @@ v = yield from f self.assertEqual(v, 'a') - loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + loop.run_until_complete(self.new_task(loop, foo())) def test_as_completed_reverse_wait(self): @@ -1156,7 +1170,7 @@ result.append((yield from f)) return result - fut = asyncio.Task(runner(), loop=self.loop) + fut = self.new_task(self.loop, runner()) self.loop.run_until_complete(fut) result = fut.result() self.assertEqual(set(result), {'ham', 'spam'}) @@ -1179,7 +1193,7 @@ res = yield from asyncio.sleep(dt/2, arg, loop=loop) return res - t = asyncio.Task(sleeper(0.1, 'yeah'), loop=loop) + t = self.new_task(loop, sleeper(0.1, 'yeah')) loop.run_until_complete(t) self.assertTrue(t.done()) self.assertEqual(t.result(), 'yeah') @@ -1194,8 +1208,7 @@ loop = self.new_test_loop(gen) - t = asyncio.Task(asyncio.sleep(10.0, 'yeah', loop=loop), - loop=loop) + t = self.new_task(loop, asyncio.sleep(10.0, 'yeah', loop=loop)) handle = None orig_call_later = loop.call_later @@ -1231,7 +1244,7 @@ @asyncio.coroutine def doit(): - sleeper = asyncio.Task(sleep(5000), loop=loop) + sleeper = self.new_task(loop, sleep(5000)) loop.call_later(0.1, sleeper.cancel) try: yield from sleeper @@ -1245,13 +1258,13 @@ self.assertAlmostEqual(0.1, loop.time()) def test_task_cancel_waiter_future(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) @asyncio.coroutine def coro(): yield from fut - task = asyncio.Task(coro(), loop=self.loop) + task = self.new_task(self.loop, coro()) test_utils.run_briefly(self.loop) self.assertIs(task._fut_waiter, fut) @@ -1268,7 +1281,7 @@ return 'ko' gen = notmuch() - task = asyncio.Task(gen, loop=self.loop) + task = self.new_task(self.loop, gen) task.set_result('ok') self.assertRaises(AssertionError, task._step) @@ -1304,7 +1317,7 @@ nonlocal result result = yield from fut - t = asyncio.Task(wait_for_future(), loop=self.loop) + t = self.new_task(self.loop, wait_for_future()) test_utils.run_briefly(self.loop) self.assertTrue(fut.cb_added) @@ -1320,7 +1333,7 @@ def notmutch(): raise BaseException() - task = asyncio.Task(notmutch(), loop=self.loop) + task = self.new_task(self.loop, notmutch()) self.assertRaises(BaseException, task._step) self.assertTrue(task.done()) @@ -1348,7 +1361,7 @@ except asyncio.CancelledError: raise base_exc - task = asyncio.Task(notmutch(), loop=loop) + task = self.new_task(loop, notmutch()) test_utils.run_briefly(loop) task.cancel() @@ -1376,7 +1389,7 @@ self.assertTrue(asyncio.iscoroutinefunction(fn2)) def test_yield_vs_yield_from(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) @asyncio.coroutine def wait_for_future(): @@ -1420,7 +1433,7 @@ self.assertEqual(res, 'test') def test_coroutine_non_gen_function_return_future(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) @asyncio.coroutine def func(): @@ -1430,49 +1443,53 @@ def coro(): fut.set_result('test') - t1 = asyncio.Task(func(), loop=self.loop) - t2 = asyncio.Task(coro(), loop=self.loop) + t1 = self.new_task(self.loop, func()) + t2 = self.new_task(self.loop, coro()) res = self.loop.run_until_complete(t1) self.assertEqual(res, 'test') self.assertIsNone(t2.result()) def test_current_task(self): - self.assertIsNone(asyncio.Task.current_task(loop=self.loop)) + Task = self.__class__.Task + + self.assertIsNone(Task.current_task(loop=self.loop)) @asyncio.coroutine def coro(loop): - self.assertTrue(asyncio.Task.current_task(loop=loop) is task) - - task = asyncio.Task(coro(self.loop), loop=self.loop) + self.assertTrue(Task.current_task(loop=loop) is task) + + task = self.new_task(self.loop, coro(self.loop)) self.loop.run_until_complete(task) - self.assertIsNone(asyncio.Task.current_task(loop=self.loop)) + self.assertIsNone(Task.current_task(loop=self.loop)) def test_current_task_with_interleaving_tasks(self): - self.assertIsNone(asyncio.Task.current_task(loop=self.loop)) - - fut1 = asyncio.Future(loop=self.loop) - fut2 = asyncio.Future(loop=self.loop) + Task = self.__class__.Task + + self.assertIsNone(Task.current_task(loop=self.loop)) + + fut1 = self.new_future(self.loop) + fut2 = self.new_future(self.loop) @asyncio.coroutine def coro1(loop): - self.assertTrue(asyncio.Task.current_task(loop=loop) is task1) + self.assertTrue(Task.current_task(loop=loop) is task1) yield from fut1 - self.assertTrue(asyncio.Task.current_task(loop=loop) is task1) + self.assertTrue(Task.current_task(loop=loop) is task1) fut2.set_result(True) @asyncio.coroutine def coro2(loop): - self.assertTrue(asyncio.Task.current_task(loop=loop) is task2) + self.assertTrue(Task.current_task(loop=loop) is task2) fut1.set_result(True) yield from fut2 - self.assertTrue(asyncio.Task.current_task(loop=loop) is task2) - - task1 = asyncio.Task(coro1(self.loop), loop=self.loop) - task2 = asyncio.Task(coro2(self.loop), loop=self.loop) + self.assertTrue(Task.current_task(loop=loop) is task2) + + task1 = self.new_task(self.loop, coro1(self.loop)) + task2 = self.new_task(self.loop, coro2(self.loop)) self.loop.run_until_complete(asyncio.wait((task1, task2), loop=self.loop)) - self.assertIsNone(asyncio.Task.current_task(loop=self.loop)) + self.assertIsNone(Task.current_task(loop=self.loop)) # Some thorough tests for cancellation propagation through # coroutines, tasks and wait(). @@ -1480,7 +1497,7 @@ def test_yield_future_passes_cancel(self): # Cancelling outer() cancels inner() cancels waiter. proof = 0 - waiter = asyncio.Future(loop=self.loop) + waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): @@ -1514,7 +1531,7 @@ # Cancelling outer() makes wait() return early, leaves inner() # running. proof = 0 - waiter = asyncio.Future(loop=self.loop) + waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): @@ -1538,14 +1555,14 @@ self.assertEqual(proof, 1) def test_shield_result(self): - inner = asyncio.Future(loop=self.loop) + inner = self.new_future(self.loop) outer = asyncio.shield(inner) inner.set_result(42) res = self.loop.run_until_complete(outer) self.assertEqual(res, 42) def test_shield_exception(self): - inner = asyncio.Future(loop=self.loop) + inner = self.new_future(self.loop) outer = asyncio.shield(inner) test_utils.run_briefly(self.loop) exc = RuntimeError('expected') @@ -1554,7 +1571,7 @@ self.assertIs(outer.exception(), exc) def test_shield_cancel(self): - inner = asyncio.Future(loop=self.loop) + inner = self.new_future(self.loop) outer = asyncio.shield(inner) test_utils.run_briefly(self.loop) inner.cancel() @@ -1562,7 +1579,7 @@ self.assertTrue(outer.cancelled()) def test_shield_shortcut(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) fut.set_result(42) res = self.loop.run_until_complete(asyncio.shield(fut)) self.assertEqual(res, 42) @@ -1570,7 +1587,7 @@ def test_shield_effect(self): # Cancelling outer() does not affect inner(). proof = 0 - waiter = asyncio.Future(loop=self.loop) + waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): @@ -1594,8 +1611,8 @@ self.assertEqual(proof, 1) def test_shield_gather(self): - child1 = asyncio.Future(loop=self.loop) - child2 = asyncio.Future(loop=self.loop) + child1 = self.new_future(self.loop) + child2 = self.new_future(self.loop) parent = asyncio.gather(child1, child2, loop=self.loop) outer = asyncio.shield(parent, loop=self.loop) test_utils.run_briefly(self.loop) @@ -1608,8 +1625,8 @@ self.assertEqual(parent.result(), [1, 2]) def test_gather_shield(self): - child1 = asyncio.Future(loop=self.loop) - child2 = asyncio.Future(loop=self.loop) + child1 = self.new_future(self.loop) + child2 = self.new_future(self.loop) inner1 = asyncio.shield(child1, loop=self.loop) inner2 = asyncio.shield(child2, loop=self.loop) parent = asyncio.gather(inner1, inner2, loop=self.loop) @@ -1625,7 +1642,7 @@ test_utils.run_briefly(self.loop) def test_as_completed_invalid_args(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) # as_completed() expects a list of futures, not a future instance self.assertRaises(TypeError, self.loop.run_until_complete, @@ -1636,7 +1653,7 @@ coro.close() def test_wait_invalid_args(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) # wait() expects a list of futures, not a future instance self.assertRaises(TypeError, self.loop.run_until_complete, @@ -1663,7 +1680,7 @@ yield from fut # A completed Future used to run the coroutine. - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) fut.set_result(None) # Call the coroutine. @@ -1697,15 +1714,15 @@ @asyncio.coroutine def t2(): - f = asyncio.Future(loop=self.loop) - asyncio.Task(t3(f), loop=self.loop) + f = self.new_future(self.loop) + self.new_task(self.loop, t3(f)) return (yield from f) @asyncio.coroutine def t3(f): f.set_result((1, 2, 3)) - task = asyncio.Task(t1(), loop=self.loop) + task = self.new_task(self.loop, t1()) val = self.loop.run_until_complete(task) self.assertEqual(val, (1, 2, 3)) @@ -1768,9 +1785,11 @@ @unittest.skipUnless(PY34, 'need python 3.4 or later') def test_log_destroyed_pending_task(self): + Task = self.__class__.Task + @asyncio.coroutine def kill_me(loop): - future = asyncio.Future(loop=loop) + future = self.new_future(loop) yield from future # at this point, the only reference to kill_me() task is # the Task._wakeup() method in future._callbacks @@ -1783,7 +1802,7 @@ # schedule the task coro = kill_me(self.loop) task = asyncio.ensure_future(coro, loop=self.loop) - self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), {task}) + self.assertEqual(Task.all_tasks(loop=self.loop), {task}) # execute the task so it waits for future self.loop._run_once() @@ -1798,7 +1817,7 @@ # no more reference to kill_me() task: the task is destroyed by the GC support.gc_collect() - self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), set()) + self.assertEqual(Task.all_tasks(loop=self.loop), set()) mock_handler.assert_called_with(self.loop, { 'message': 'Task was destroyed but it is pending!', @@ -1863,10 +1882,10 @@ def test_task_source_traceback(self): self.loop.set_debug(True) - task = asyncio.Task(coroutine_function(), loop=self.loop) + task = self.new_task(self.loop, coroutine_function()) lineno = sys._getframe().f_lineno - 1 self.assertIsInstance(task._source_traceback, list) - self.assertEqual(task._source_traceback[-1][:3], + self.assertEqual(task._source_traceback[-2][:3], (__file__, lineno, 'test_task_source_traceback')) @@ -1878,7 +1897,7 @@ @asyncio.coroutine def blocking_coroutine(): - fut = asyncio.Future(loop=loop) + fut = self.new_future(loop) # Block: fut result is never set yield from fut @@ -1905,7 +1924,7 @@ loop = asyncio.new_event_loop() self.addCleanup(loop.close) - fut = asyncio.Future(loop=loop) + fut = self.new_future(loop) # The indirection fut->child_coro is needed since otherwise the # gathering task is done at the same time as the child future def child_coro(): @@ -1929,6 +1948,53 @@ self.assertFalse(gather_task.cancelled()) self.assertEqual(gather_task.result(), [42]) + @mock.patch('asyncio.base_events.logger') + def test_error_in_call_soon(self, m_log): + def call_soon(callback, *args): + raise ValueError + self.loop.call_soon = call_soon + + @asyncio.coroutine + def coro(): + pass + + self.assertFalse(m_log.error.called) + + with self.assertRaises(ValueError): + self.new_task(self.loop, coro()) + + self.assertTrue(m_log.error.called) + message = m_log.error.call_args[0][0] + self.assertIn('Task was destroyed but it is pending', message) + + self.assertEqual(self.Task.all_tasks(self.loop), set()) + + +@unittest.skipUnless(hasattr(futures, '_CFuture'), + 'requires the C _asyncio module') +class CTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + Future = getattr(futures, '_CFuture', None) + + +@unittest.skipUnless(hasattr(futures, '_CFuture'), + 'requires the C _asyncio module') +class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + Future = futures._PyFuture + + +@unittest.skipUnless(hasattr(futures, '_CFuture'), + 'requires the C _asyncio module') +class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): + Task = tasks._PyTask + Future = getattr(futures, '_CFuture', None) + + +class PyTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): + Task = tasks._PyTask + Future = futures._PyFuture + class GatherTestsBase: diff -r deb3e5857d8c Modules/_asynciomodule.c --- a/Modules/_asynciomodule.c Thu Oct 27 19:30:10 2016 +0900 +++ b/Modules/_asynciomodule.c Fri Oct 28 00:12:34 2016 -0400 @@ -2,20 +2,32 @@ #include "structmember.h" +/*[clinic input] +module _asyncio +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=8fd17862aa989c69]*/ + + /* identifiers used from some functions */ _Py_IDENTIFIER(call_soon); +_Py_IDENTIFIER(cancel); +_Py_IDENTIFIER(send); +_Py_IDENTIFIER(throw); +_Py_IDENTIFIER(add_done_callback); /* State of the _asyncio module */ +static PyObject *all_tasks; +static PyDictObject *current_tasks; static PyObject *traceback_extract_stack; static PyObject *asyncio_get_event_loop; -static PyObject *asyncio_repr_info_func; +static PyObject *asyncio_future_repr_info_func; +static PyObject *asyncio_task_repr_info_func; +static PyObject *asyncio_task_get_stack_func; +static PyObject *asyncio_task_print_stack_func; static PyObject *asyncio_InvalidStateError; static PyObject *asyncio_CancelledError; - - -/* Get FutureIter from Future */ -static PyObject* new_future_iter(PyObject *fut); +static PyObject *inspect_isgenerator; typedef enum { @@ -24,24 +36,56 @@ STATE_FINISHED } fut_state; +#define FutureObj_HEAD(prefix) \ + PyObject_HEAD \ + PyObject *prefix##_loop; \ + PyObject *prefix##_callbacks; \ + PyObject *prefix##_exception; \ + PyObject *prefix##_result; \ + PyObject *prefix##_source_tb; \ + fut_state prefix##_state; \ + int prefix##_log_tb; \ + int prefix##_blocking; \ + PyObject *dict; \ + PyObject *prefix##_weakreflist; + +typedef struct { + FutureObj_HEAD(fut) +} FutureObj; + +typedef struct { + FutureObj_HEAD(task) + PyObject *task_fut_waiter; + PyObject *task_coro; + int task_must_cancel; + int task_log_destroy_pending; +} TaskObj; typedef struct { PyObject_HEAD - PyObject *fut_loop; - PyObject *fut_callbacks; - PyObject *fut_exception; - PyObject *fut_result; - PyObject *fut_source_tb; - fut_state fut_state; - int fut_log_tb; - int fut_blocking; - PyObject *dict; - PyObject *fut_weakreflist; -} FutureObj; + TaskObj *sw_task; + PyObject *sw_arg; +} TaskSendMethWrapper; +typedef struct { + PyObject_HEAD + TaskObj *ww_task; +} TaskWakeupMethWrapper; + + +#include "clinic/_asynciomodule.c.h" + + +/*[clinic input] +class _asyncio.Future "FutureObj *" "&Future_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=00d3e4abca711e0f]*/ + +/* Get FutureIter from Future */ +static PyObject* future_new_iter(PyObject *fut); static int -_schedule_callbacks(FutureObj *fut) +future_schedule_callbacks(FutureObj *fut) { Py_ssize_t len; PyObject* iters; @@ -87,16 +131,11 @@ } static int -FutureObj_init(FutureObj *fut, PyObject *args, PyObject *kwds) +future_init(FutureObj *fut, PyObject *loop) { - static char *kwlist[] = {"loop", NULL}; - PyObject *loop = NULL; PyObject *res = NULL; _Py_IDENTIFIER(get_debug); - if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$O", kwlist, &loop)) { - return -1; - } if (loop == NULL || loop == Py_None) { loop = PyObject_CallObject(asyncio_get_event_loop, NULL); if (loop == NULL) { @@ -128,106 +167,12 @@ if (fut->fut_callbacks == NULL) { return -1; } + return 0; } -static int -FutureObj_clear(FutureObj *fut) -{ - Py_CLEAR(fut->fut_loop); - Py_CLEAR(fut->fut_callbacks); - Py_CLEAR(fut->fut_result); - Py_CLEAR(fut->fut_exception); - Py_CLEAR(fut->fut_source_tb); - Py_CLEAR(fut->dict); - return 0; -} - -static int -FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg) -{ - Py_VISIT(fut->fut_loop); - Py_VISIT(fut->fut_callbacks); - Py_VISIT(fut->fut_result); - Py_VISIT(fut->fut_exception); - Py_VISIT(fut->fut_source_tb); - Py_VISIT(fut->dict); - return 0; -} - -PyDoc_STRVAR(pydoc_result, - "Return the result this future represents.\n" - "\n" - "If the future has been cancelled, raises CancelledError. If the\n" - "future's result isn't yet available, raises InvalidStateError. If\n" - "the future is done and has an exception set, this exception is raised." -); - static PyObject * -FutureObj_result(FutureObj *fut, PyObject *arg) -{ - if (fut->fut_state == STATE_CANCELLED) { - PyErr_SetString(asyncio_CancelledError, ""); - return NULL; - } - - if (fut->fut_state != STATE_FINISHED) { - PyErr_SetString(asyncio_InvalidStateError, "Result is not ready."); - return NULL; - } - - fut->fut_log_tb = 0; - if (fut->fut_exception != NULL) { - PyObject *type = NULL; - type = PyExceptionInstance_Class(fut->fut_exception); - PyErr_SetObject(type, fut->fut_exception); - return NULL; - } - - Py_INCREF(fut->fut_result); - return fut->fut_result; -} - -PyDoc_STRVAR(pydoc_exception, - "Return the exception that was set on this future.\n" - "\n" - "The exception (or None if no exception was set) is returned only if\n" - "the future is done. If the future has been cancelled, raises\n" - "CancelledError. If the future isn't done yet, raises\n" - "InvalidStateError." -); - -static PyObject * -FutureObj_exception(FutureObj *fut, PyObject *arg) -{ - if (fut->fut_state == STATE_CANCELLED) { - PyErr_SetString(asyncio_CancelledError, ""); - return NULL; - } - - if (fut->fut_state != STATE_FINISHED) { - PyErr_SetString(asyncio_InvalidStateError, "Result is not ready."); - return NULL; - } - - if (fut->fut_exception != NULL) { - fut->fut_log_tb = 0; - Py_INCREF(fut->fut_exception); - return fut->fut_exception; - } - - Py_RETURN_NONE; -} - -PyDoc_STRVAR(pydoc_set_result, - "Mark the future done and set its result.\n" - "\n" - "If the future is already done when this method is called, raises\n" - "InvalidStateError." -); - -static PyObject * -FutureObj_set_result(FutureObj *fut, PyObject *res) +future_set_result(FutureObj *fut, PyObject *res) { if (fut->fut_state != STATE_PENDING) { PyErr_SetString(asyncio_InvalidStateError, "invalid state"); @@ -238,21 +183,14 @@ fut->fut_result = res; fut->fut_state = STATE_FINISHED; - if (_schedule_callbacks(fut) == -1) { + if (future_schedule_callbacks(fut) == -1) { return NULL; } Py_RETURN_NONE; } -PyDoc_STRVAR(pydoc_set_exception, - "Mark the future done and set an exception.\n" - "\n" - "If the future is already done when this method is called, raises\n" - "InvalidStateError." -); - static PyObject * -FutureObj_set_exception(FutureObj *fut, PyObject *exc) +future_set_exception(FutureObj *fut, PyObject *exc) { PyObject *exc_val = NULL; @@ -287,7 +225,7 @@ fut->fut_exception = exc_val; fut->fut_state = STATE_FINISHED; - if (_schedule_callbacks(fut) == -1) { + if (future_schedule_callbacks(fut) == -1) { return NULL; } @@ -295,16 +233,50 @@ Py_RETURN_NONE; } -PyDoc_STRVAR(pydoc_add_done_callback, - "Add a callback to be run when the future becomes done.\n" - "\n" - "The callback is called with a single argument - the future object. If\n" - "the future is already done when this is called, the callback is\n" - "scheduled with call_soon."; -); +static int +future_get_result(FutureObj *fut, PyObject **result) +{ + PyObject *exc; + + if (fut->fut_state == STATE_CANCELLED) { + exc = _PyObject_CallNoArg(asyncio_CancelledError); + if (exc == NULL) { + return -1; + } + *result = exc; + return 1; + } + + if (fut->fut_state != STATE_FINISHED) { + PyObject *msg = PyUnicode_FromString("Result is not ready."); + if (msg == NULL) { + return -1; + } + + exc = _PyObject_CallArg1(asyncio_InvalidStateError, msg); + Py_DECREF(msg); + if (exc == NULL) { + return -1; + } + + *result = exc; + return 1; + } + + fut->fut_log_tb = 0; + if (fut->fut_exception != NULL) { + Py_INCREF(fut->fut_exception); + *result = fut->fut_exception; + return 1; + } + + Py_INCREF(fut->fut_result); + *result = fut->fut_result; + return 0; +} static PyObject * -FutureObj_add_done_callback(FutureObj *fut, PyObject *arg) +future_add_done_callback(FutureObj *fut, PyObject *arg) { if (fut->fut_state != STATE_PENDING) { PyObject *handle = _PyObject_CallMethodId( @@ -326,19 +298,215 @@ Py_RETURN_NONE; } -PyDoc_STRVAR(pydoc_remove_done_callback, - "Remove all instances of a callback from the \"call when done\" list.\n" - "\n" - "Returns the number of callbacks removed." -); +static PyObject * +future_cancel(FutureObj *fut) +{ + if (fut->fut_state != STATE_PENDING) { + Py_RETURN_FALSE; + } + fut->fut_state = STATE_CANCELLED; + + if (future_schedule_callbacks(fut) == -1) { + return NULL; + } + + Py_RETURN_TRUE; +} + +/*[clinic input] +_asyncio.Future.__init__ + + * + loop: 'O' = NULL + +This class is *almost* compatible with concurrent.futures.Future. + + Differences: + + - result() and exception() do not take a timeout argument and + raise an exception when the future isn't done yet. + + - Callbacks registered with add_done_callback() are always called + via the event loop's call_soon_threadsafe(). + + - This class is not compatible with the wait() and as_completed() + methods in the concurrent.futures package. +[clinic start generated code]*/ +static int +_asyncio_Future___init___impl(FutureObj *self, PyObject *loop) +/*[clinic end generated code: output=9ed75799eaccb5d6 input=8e1681f23605be2d]*/ + +{ + return future_init(self, loop); +} + +static int +FutureObj_clear(FutureObj *fut) +{ + Py_CLEAR(fut->fut_loop); + Py_CLEAR(fut->fut_callbacks); + Py_CLEAR(fut->fut_result); + Py_CLEAR(fut->fut_exception); + Py_CLEAR(fut->fut_source_tb); + Py_CLEAR(fut->dict); + return 0; +} + +static int +FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg) +{ + Py_VISIT(fut->fut_loop); + Py_VISIT(fut->fut_callbacks); + Py_VISIT(fut->fut_result); + Py_VISIT(fut->fut_exception); + Py_VISIT(fut->fut_source_tb); + Py_VISIT(fut->dict); + return 0; +} + +/*[clinic input] +_asyncio.Future.result + +Return the result this future represents. + +If the future has been cancelled, raises CancelledError. If the +future's result isn't yet available, raises InvalidStateError. If +the future is done and has an exception set, this exception is raised. +[clinic start generated code]*/ static PyObject * -FutureObj_remove_done_callback(FutureObj *fut, PyObject *arg) +_asyncio_Future_result_impl(FutureObj *self) +/*[clinic end generated code: output=f35f940936a4b1e5 input=49ecf9cf5ec50dc5]*/ +{ + PyObject *result; + int res = future_get_result(self, &result); + + if (res == -1) { + return NULL; + } + + if (res == 0) { + return result; + } + + assert(res == 1); + + PyErr_SetObject(PyExceptionInstance_Class(result), result); + Py_DECREF(result); + return NULL; +} + +/*[clinic input] +_asyncio.Future.exception + +Return the exception that was set on this future. + +The exception (or None if no exception was set) is returned only if +the future is done. If the future has been cancelled, raises +CancelledError. If the future isn't done yet, raises +InvalidStateError. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_exception_impl(FutureObj *self) +/*[clinic end generated code: output=88b20d4f855e0710 input=733547a70c841c68]*/ +{ + if (self->fut_state == STATE_CANCELLED) { + PyErr_SetString(asyncio_CancelledError, ""); + return NULL; + } + + if (self->fut_state != STATE_FINISHED) { + PyErr_SetString(asyncio_InvalidStateError, "Result is not ready."); + return NULL; + } + + if (self->fut_exception != NULL) { + self->fut_log_tb = 0; + Py_INCREF(self->fut_exception); + return self->fut_exception; + } + + Py_RETURN_NONE; +} + +/*[clinic input] +_asyncio.Future.set_result + + res: 'O' + / + +Mark the future done and set its result. + +If the future is already done when this method is called, raises +InvalidStateError. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_set_result(FutureObj *self, PyObject *res) +/*[clinic end generated code: output=a620abfc2796bfb6 input=8619565e0503357e]*/ +{ + return future_set_result(self, res); +} + +/*[clinic input] +_asyncio.Future.set_exception + + exception: 'O' + / + +Mark the future done and set an exception. + +If the future is already done when this method is called, raises +InvalidStateError. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_set_exception(FutureObj *self, PyObject *exception) +/*[clinic end generated code: output=f1c1b0cd321be360 input=1377dbe15e6ea186]*/ +{ + return future_set_exception(self, exception); +} + +/*[clinic input] +_asyncio.Future.add_done_callback + + fn: 'O' + / + +Add a callback to be run when the future becomes done. + +The callback is called with a single argument - the future object. If +the future is already done when this is called, the callback is +scheduled with call_soon. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_add_done_callback(FutureObj *self, PyObject *fn) +/*[clinic end generated code: output=819e09629b2ec2b5 input=8cce187e32cec6a8]*/ +{ + return future_add_done_callback(self, fn); +} + +/*[clinic input] +_asyncio.Future.remove_done_callback + + fn: 'O' + / + +Remove all instances of a callback from the "call when done" list. + +Returns the number of callbacks removed. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_remove_done_callback(FutureObj *self, PyObject *fn) +/*[clinic end generated code: output=5ab1fb52b24ef31f input=3fedb73e1409c31c]*/ { PyObject *newlist; Py_ssize_t len, i, j=0; - len = PyList_GET_SIZE(fut->fut_callbacks); + len = PyList_GET_SIZE(self->fut_callbacks); if (len == 0) { return PyLong_FromSsize_t(0); } @@ -350,9 +518,9 @@ for (i = 0; i < len; i++) { int ret; - PyObject *item = PyList_GET_ITEM(fut->fut_callbacks, i); + PyObject *item = PyList_GET_ITEM(self->fut_callbacks, i); - if ((ret = PyObject_RichCompareBool(arg, item, Py_EQ)) < 0) { + if ((ret = PyObject_RichCompareBool(fn, item, Py_EQ)) < 0) { goto fail; } if (ret == 0) { @@ -365,7 +533,7 @@ if (PyList_SetSlice(newlist, j, len, NULL) < 0) { goto fail; } - if (PyList_SetSlice(fut->fut_callbacks, 0, len, newlist) < 0) { + if (PyList_SetSlice(self->fut_callbacks, 0, len, newlist) < 0) { goto fail; } Py_DECREF(newlist); @@ -376,35 +544,34 @@ return NULL; } -PyDoc_STRVAR(pydoc_cancel, - "Cancel the future and schedule callbacks.\n" - "\n" - "If the future is already done or cancelled, return False. Otherwise,\n" - "change the future's state to cancelled, schedule the callbacks and\n" - "return True." -); +/*[clinic input] +_asyncio.Future.cancel + +Cancel the future and schedule callbacks. + +If the future is already done or cancelled, return False. Otherwise, +change the future's state to cancelled, schedule the callbacks and +return True. +[clinic start generated code]*/ static PyObject * -FutureObj_cancel(FutureObj *fut, PyObject *arg) +_asyncio_Future_cancel_impl(FutureObj *self) +/*[clinic end generated code: output=e45b932ba8bd68a1 input=515709a127995109]*/ { - if (fut->fut_state != STATE_PENDING) { - Py_RETURN_FALSE; - } - fut->fut_state = STATE_CANCELLED; - - if (_schedule_callbacks(fut) == -1) { - return NULL; - } - - Py_RETURN_TRUE; + return future_cancel(self); } -PyDoc_STRVAR(pydoc_cancelled, "Return True if the future was cancelled."); +/*[clinic input] +_asyncio.Future.cancelled + +Return True if the future was cancelled. +[clinic start generated code]*/ static PyObject * -FutureObj_cancelled(FutureObj *fut, PyObject *arg) +_asyncio_Future_cancelled_impl(FutureObj *self) +/*[clinic end generated code: output=145197ced586357d input=943ab8b7b7b17e45]*/ { - if (fut->fut_state == STATE_CANCELLED) { + if (self->fut_state == STATE_CANCELLED) { Py_RETURN_TRUE; } else { @@ -412,17 +579,20 @@ } } -PyDoc_STRVAR(pydoc_done, - "Return True if the future is done.\n" - "\n" - "Done means either that a result / exception are available, or that the\n" - "future was cancelled." -); +/*[clinic input] +_asyncio.Future.done + +Return True if the future is done. + +Done means either that a result / exception are available, or that the +future was cancelled. +[clinic start generated code]*/ static PyObject * -FutureObj_done(FutureObj *fut, PyObject *arg) +_asyncio_Future_done_impl(FutureObj *self) +/*[clinic end generated code: output=244c5ac351145096 input=28d7b23fdb65d2ac]*/ { - if (fut->fut_state == STATE_PENDING) { + if (self->fut_state == STATE_PENDING) { Py_RETURN_FALSE; } else { @@ -541,10 +711,8 @@ static PyObject* FutureObj__repr_info(FutureObj *fut) { - if (asyncio_repr_info_func == NULL) { - return PyList_New(0); - } - return PyObject_CallFunctionObjArgs(asyncio_repr_info_func, fut, NULL); + return PyObject_CallFunctionObjArgs( + asyncio_future_repr_info_func, fut, NULL); } static PyObject * @@ -661,43 +829,38 @@ static PyAsyncMethods FutureType_as_async = { - (unaryfunc)new_future_iter, /* am_await */ + (unaryfunc)future_new_iter, /* am_await */ 0, /* am_aiter */ 0 /* am_anext */ }; static PyMethodDef FutureType_methods[] = { + _ASYNCIO_FUTURE_RESULT_METHODDEF + _ASYNCIO_FUTURE_EXCEPTION_METHODDEF + _ASYNCIO_FUTURE_SET_RESULT_METHODDEF + _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF + _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF + _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF + _ASYNCIO_FUTURE_CANCEL_METHODDEF + _ASYNCIO_FUTURE_CANCELLED_METHODDEF + _ASYNCIO_FUTURE_DONE_METHODDEF {"_repr_info", (PyCFunction)FutureObj__repr_info, METH_NOARGS, NULL}, - {"add_done_callback", - (PyCFunction)FutureObj_add_done_callback, - METH_O, pydoc_add_done_callback}, - {"remove_done_callback", - (PyCFunction)FutureObj_remove_done_callback, - METH_O, pydoc_remove_done_callback}, - {"set_result", - (PyCFunction)FutureObj_set_result, METH_O, pydoc_set_result}, - {"set_exception", - (PyCFunction)FutureObj_set_exception, METH_O, pydoc_set_exception}, - {"cancel", (PyCFunction)FutureObj_cancel, METH_NOARGS, pydoc_cancel}, - {"cancelled", - (PyCFunction)FutureObj_cancelled, METH_NOARGS, pydoc_cancelled}, - {"done", (PyCFunction)FutureObj_done, METH_NOARGS, pydoc_done}, - {"result", (PyCFunction)FutureObj_result, METH_NOARGS, pydoc_result}, - {"exception", - (PyCFunction)FutureObj_exception, METH_NOARGS, pydoc_exception}, {NULL, NULL} /* Sentinel */ }; +#define FUTURE_COMMON_GETSETLIST \ + {"_state", (getter)FutureObj_get_state, NULL, NULL}, \ + {"_asyncio_future_blocking", (getter)FutureObj_get_blocking, \ + (setter)FutureObj_set_blocking, NULL}, \ + {"_loop", (getter)FutureObj_get_loop, NULL, NULL}, \ + {"_callbacks", (getter)FutureObj_get_callbacks, NULL, NULL}, \ + {"_result", (getter)FutureObj_get_result, NULL, NULL}, \ + {"_exception", (getter)FutureObj_get_exception, NULL, NULL}, \ + {"_log_traceback", (getter)FutureObj_get_log_traceback, NULL, NULL}, \ + {"_source_traceback", (getter)FutureObj_get_source_traceback, NULL, NULL}, + static PyGetSetDef FutureType_getsetlist[] = { - {"_state", (getter)FutureObj_get_state, NULL, NULL}, - {"_asyncio_future_blocking", (getter)FutureObj_get_blocking, - (setter)FutureObj_set_blocking, NULL}, - {"_loop", (getter)FutureObj_get_loop, NULL, NULL}, - {"_callbacks", (getter)FutureObj_get_callbacks, NULL, NULL}, - {"_result", (getter)FutureObj_get_result, NULL, NULL}, - {"_exception", (getter)FutureObj_get_exception, NULL, NULL}, - {"_log_traceback", (getter)FutureObj_get_log_traceback, NULL, NULL}, - {"_source_traceback", (getter)FutureObj_get_source_traceback, NULL, NULL}, + FUTURE_COMMON_GETSETLIST {NULL} /* Sentinel */ }; @@ -712,25 +875,27 @@ .tp_repr = (reprfunc)FutureObj_repr, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_FINALIZE, - .tp_doc = "Fast asyncio.Future implementation.", + .tp_doc = _asyncio_Future___init____doc__, .tp_traverse = (traverseproc)FutureObj_traverse, .tp_clear = (inquiry)FutureObj_clear, .tp_weaklistoffset = offsetof(FutureObj, fut_weakreflist), - .tp_iter = (getiterfunc)new_future_iter, + .tp_iter = (getiterfunc)future_new_iter, .tp_methods = FutureType_methods, .tp_getset = FutureType_getsetlist, .tp_dictoffset = offsetof(FutureObj, dict), - .tp_init = (initproc)FutureObj_init, + .tp_init = (initproc)_asyncio_Future___init__, .tp_new = PyType_GenericNew, .tp_finalize = (destructor)FutureObj_finalize, }; +#define Future_CheckExact(obj) (Py_TYPE(obj) == &FutureType) + static void FutureObj_dealloc(PyObject *self) { FutureObj *fut = (FutureObj *)self; - if (Py_TYPE(fut) == &FutureType) { + if (Future_CheckExact(fut)) { /* When fut is subclass of Future, finalizer is called from * subtype_dealloc. */ @@ -744,7 +909,7 @@ PyObject_ClearWeakRefs(self); } - FutureObj_clear(fut); + (void)FutureObj_clear(fut); Py_TYPE(fut)->tp_free(fut); } @@ -759,7 +924,7 @@ static void FutureIter_dealloc(futureiterobject *it) { - _PyObject_GC_UNTRACK(it); + PyObject_GC_UnTrack(it); Py_XDECREF(it->future); PyObject_GC_Del(it); } @@ -785,7 +950,7 @@ return NULL; } - res = FutureObj_result(fut, NULL); + res = _asyncio_Future_result_impl(fut); if (res != NULL) { /* The result of the Future is not an exception. @@ -884,37 +1049,19 @@ static PyTypeObject FutureIterType = { PyVarObject_HEAD_INIT(0, 0) "_asyncio.FutureIter", - sizeof(futureiterobject), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor)FutureIter_dealloc, /* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_as_async */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - 0, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - PyObject_GenericGetAttr, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */ - 0, /* tp_doc */ - (traverseproc)FutureIter_traverse, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - PyObject_SelfIter, /* tp_iter */ - (iternextfunc)FutureIter_iternext, /* tp_iternext */ - FutureIter_methods, /* tp_methods */ - 0, /* tp_members */ + .tp_basicsize = sizeof(futureiterobject), + .tp_itemsize = 0, + .tp_dealloc = (destructor)FutureIter_dealloc, + .tp_getattro = PyObject_GenericGetAttr, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, + .tp_traverse = (traverseproc)FutureIter_traverse, + .tp_iter = PyObject_SelfIter, + .tp_iternext = (iternextfunc)FutureIter_iternext, + .tp_methods = FutureIter_methods, }; static PyObject * -new_future_iter(PyObject *fut) +future_new_iter(PyObject *fut) { futureiterobject *it; @@ -932,69 +1079,1240 @@ return (PyObject*)it; } -/*********************** Module **************************/ + +/*********************** Task **************************/ + + +/*[clinic input] +class _asyncio.Task "TaskObj *" "&Task_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=719dcef0fcc03b37]*/ + +static PyObject * task_wakeup(TaskObj *, PyObject *); +static PyObject * task_step(TaskObj *, PyObject *); + +/* ----- Task._step wrapper */ static int -init_module(void) +TaskSendMethWrapper_clear(TaskSendMethWrapper *o) { - PyObject *module = NULL; + Py_CLEAR(o->sw_task); + Py_CLEAR(o->sw_arg); + return 0; +} - module = PyImport_ImportModule("traceback"); - if (module == NULL) { +static void +TaskSendMethWrapper_dealloc(TaskSendMethWrapper *o) +{ + PyObject_GC_UnTrack(o); + (void)TaskSendMethWrapper_clear(o); + Py_TYPE(o)->tp_free(o); +} + +static PyObject * +TaskSendMethWrapper_call(TaskSendMethWrapper *o, + PyObject *args, PyObject *kwds) +{ + return task_step(o->sw_task, o->sw_arg); +} + +static int +TaskSendMethWrapper_traverse(TaskSendMethWrapper *o, + visitproc visit, void *arg) +{ + Py_VISIT(o->sw_task); + Py_VISIT(o->sw_arg); + return 0; +} + +static PyObject * +TaskSendMethWrapper_get___self__(TaskSendMethWrapper *o) +{ + if (o->sw_task) { + Py_INCREF(o->sw_task); + return (PyObject*)o->sw_task; + } + Py_RETURN_NONE; +} + +static PyGetSetDef TaskSendMethWrapper_getsetlist[] = { + {"__self__", (getter)TaskSendMethWrapper_get___self__, NULL, NULL}, + {NULL} /* Sentinel */ +}; + +PyTypeObject TaskSendMethWrapper_Type = { + PyVarObject_HEAD_INIT(&PyType_Type, 0) + "TaskSendMethWrapper", + .tp_basicsize = sizeof(TaskSendMethWrapper), + .tp_itemsize = 0, + .tp_getset = TaskSendMethWrapper_getsetlist, + .tp_dealloc = (destructor)TaskSendMethWrapper_dealloc, + .tp_call = (ternaryfunc)TaskSendMethWrapper_call, + .tp_getattro = PyObject_GenericGetAttr, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, + .tp_traverse = (traverseproc)TaskSendMethWrapper_traverse, + .tp_clear = (inquiry)TaskSendMethWrapper_clear, +}; + +static PyObject * +TaskSendMethWrapper_new(TaskObj *task, PyObject *arg) +{ + TaskSendMethWrapper *o; + o = PyObject_GC_New(TaskSendMethWrapper, &TaskSendMethWrapper_Type); + if (o == NULL) { + return NULL; + } + + Py_INCREF(task); + o->sw_task = task; + + Py_XINCREF(arg); + o->sw_arg = arg; + + PyObject_GC_Track(o); + return (PyObject*) o; +} + +/* ----- Task._wakeup wrapper */ + +static PyObject * +TaskWakeupMethWrapper_call(TaskWakeupMethWrapper *o, + PyObject *args, PyObject *kwds) +{ + PyObject *fut; + + if (!PyArg_ParseTuple(args, "O|", &fut)) { + return NULL; + } + + return task_wakeup(o->ww_task, fut); +} + +static int +TaskWakeupMethWrapper_clear(TaskWakeupMethWrapper *o) +{ + Py_CLEAR(o->ww_task); + return 0; +} + +static int +TaskWakeupMethWrapper_traverse(TaskWakeupMethWrapper *o, + visitproc visit, void *arg) +{ + Py_VISIT(o->ww_task); + return 0; +} + +static void +TaskWakeupMethWrapper_dealloc(TaskWakeupMethWrapper *o) +{ + PyObject_GC_UnTrack(o); + (void)TaskWakeupMethWrapper_clear(o); + Py_TYPE(o)->tp_free(o); +} + +PyTypeObject TaskWakeupMethWrapper_Type = { + PyVarObject_HEAD_INIT(&PyType_Type, 0) + "TaskWakeupMethWrapper", + .tp_basicsize = sizeof(TaskWakeupMethWrapper), + .tp_itemsize = 0, + .tp_dealloc = (destructor)TaskWakeupMethWrapper_dealloc, + .tp_call = (ternaryfunc)TaskWakeupMethWrapper_call, + .tp_getattro = PyObject_GenericGetAttr, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, + .tp_traverse = (traverseproc)TaskWakeupMethWrapper_traverse, + .tp_clear = (inquiry)TaskWakeupMethWrapper_clear, +}; + +static PyObject * +TaskWakeupMethWrapper_new(TaskObj *task) +{ + TaskWakeupMethWrapper *o; + o = PyObject_GC_New(TaskWakeupMethWrapper, &TaskWakeupMethWrapper_Type); + if (o == NULL) { + return NULL; + } + + Py_INCREF(task); + o->ww_task = task; + + PyObject_GC_Track(o); + return (PyObject*) o; +} + +/* ----- Task */ + +static int +task_call_step_soon(TaskObj *task, PyObject *arg) +{ + PyObject *handle; + PyObject *cb = TaskSendMethWrapper_new(task, arg); + + if (cb == NULL) { return -1; } - // new reference - traceback_extract_stack = PyObject_GetAttrString(module, "extract_stack"); - if (traceback_extract_stack == NULL) { - goto fail; + + handle = _PyObject_CallMethodId( + task->task_loop, &PyId_call_soon, "O", cb, NULL); + + Py_DECREF(cb); + + if (handle == NULL) { + return -1; } - Py_DECREF(module); - module = PyImport_ImportModule("asyncio.events"); - if (module == NULL) { - goto fail; + Py_DECREF(handle); + return 0; +} + +/*[clinic input] +_asyncio.Task.__init__ + + coro: 'O' + * + loop: 'O' = NULL + +A coroutine wrapped in a Future. +[clinic start generated code]*/ + +static int +_asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop) +/*[clinic end generated code: output=9f24774c2287fc2f input=71d8d28c201a18cd]*/ +{ + PyObject *res; + _Py_IDENTIFIER(add); + + if (future_init((FutureObj*)self, loop)) { + return -1; } - asyncio_get_event_loop = PyObject_GetAttrString(module, "get_event_loop"); - if (asyncio_get_event_loop == NULL) { - goto fail; + + self->task_fut_waiter = NULL; + self->task_must_cancel = 0; + self->task_log_destroy_pending = 1; + + Py_INCREF(coro); + self->task_coro = coro; + + if (task_call_step_soon(self, NULL)) { + return -1; } - Py_DECREF(module); - module = PyImport_ImportModule("asyncio.futures"); - if (module == NULL) { - goto fail; + res = _PyObject_CallMethodId(all_tasks, &PyId_add, "O", self, NULL); + if (res == NULL) { + return -1; } - asyncio_repr_info_func = PyObject_GetAttrString(module, - "_future_repr_info"); - if (asyncio_repr_info_func == NULL) { + Py_DECREF(res); + + return 0; +} + +static int +TaskObj_clear(TaskObj *task) +{ + (void)FutureObj_clear((FutureObj*) task); + Py_CLEAR(task->task_coro); + Py_CLEAR(task->task_fut_waiter); + return 0; +} + +static int +TaskObj_traverse(TaskObj *task, visitproc visit, void *arg) +{ + Py_VISIT(task->task_coro); + Py_VISIT(task->task_fut_waiter); + (void)FutureObj_traverse((FutureObj*) task, visit, arg); + return 0; +} + +static PyObject * +TaskObj_get_log_destroy_pending(TaskObj *task) +{ + if (task->task_log_destroy_pending) { + Py_RETURN_TRUE; + } + else { + Py_RETURN_FALSE; + } +} + +static int +TaskObj_set_log_destroy_pending(TaskObj *task, PyObject *val) +{ + int is_true = PyObject_IsTrue(val); + if (is_true < 0) { + return -1; + } + task->task_log_destroy_pending = is_true; + return 0; +} + +static PyObject * +TaskObj_get_must_cancel(TaskObj *task) +{ + if (task->task_must_cancel) { + Py_RETURN_TRUE; + } + else { + Py_RETURN_FALSE; + } +} + +static PyObject * +TaskObj_get_coro(TaskObj *task) +{ + if (task->task_coro) { + Py_INCREF(task->task_coro); + return task->task_coro; + } + + Py_RETURN_NONE; +} + +static PyObject * +TaskObj_get_fut_waiter(TaskObj *task) +{ + if (task->task_fut_waiter) { + Py_INCREF(task->task_fut_waiter); + return task->task_fut_waiter; + } + + Py_RETURN_NONE; +} + +/*[clinic input] +@classmethod +_asyncio.Task.current_task + + loop: 'O' = NULL + +Return the currently running task in an event loop or None. + +By default the current task for the current event loop is returned. + +None is returned when called not in the context of a Task. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop) +/*[clinic end generated code: output=99fbe7332c516e03 input=cd784537f02cf833]*/ +{ + PyObject *res; + + if (loop == NULL) { + loop = PyObject_CallObject(asyncio_get_event_loop, NULL); + if (loop == NULL) { + return NULL; + } + + res = PyDict_GetItem((PyObject*)current_tasks, loop); + Py_DECREF(loop); + } + else { + res = PyDict_GetItem((PyObject*)current_tasks, loop); + } + + if (res == NULL) { + Py_RETURN_NONE; + } + else { + Py_INCREF(res); + return res; + } +} + +static PyObject * +task_all_tasks(PyObject *loop) +{ + PyObject *task; + PyObject *task_loop; + PyObject *set; + PyObject *iter; + + assert(loop != NULL); + + set = PySet_New(NULL); + if (set == NULL) { + return NULL; + } + + iter = PyObject_GetIter(all_tasks); + if (iter == NULL) { goto fail; } - asyncio_InvalidStateError = PyObject_GetAttrString(module, - "InvalidStateError"); - if (asyncio_InvalidStateError == NULL) { + while ((task = PyIter_Next(iter))) { + task_loop = PyObject_GetAttrString(task, "_loop"); + if (task_loop == NULL) { + Py_DECREF(task); + goto fail; + } + if (task_loop == loop) { + if (PySet_Add(set, task) == -1) { + Py_DECREF(task_loop); + Py_DECREF(task); + goto fail; + } + } + Py_DECREF(task_loop); + Py_DECREF(task); + } + + Py_DECREF(iter); + return set; + +fail: + Py_XDECREF(set); + Py_XDECREF(iter); + return NULL; +} + +/*[clinic input] +@classmethod +_asyncio.Task.all_tasks + + loop: 'O' = NULL + +Return a set of all tasks for an event loop. + +By default all tasks for the current event loop are returned. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop) +/*[clinic end generated code: output=11f9b20749ccca5d input=cd64aa5f88bd5c49]*/ +{ + PyObject *res; + + if (loop == NULL) { + loop = PyObject_CallObject(asyncio_get_event_loop, NULL); + if (loop == NULL) { + return NULL; + } + + res = task_all_tasks(loop); + Py_DECREF(loop); + } + else { + res = task_all_tasks(loop); + } + + return res; +} + +static PyObject * +TaskObj__repr_info(FutureObj *fut) +{ + return PyObject_CallFunctionObjArgs( + asyncio_task_repr_info_func, fut, NULL); +} + +/*[clinic input] +_asyncio.Task.cancel + +Request that this task cancel itself. + +This arranges for a CancelledError to be thrown into the +wrapped coroutine on the next cycle through the event loop. +The coroutine then has a chance to clean up or even deny +the request using try/except/finally. + +Unlike Future.cancel, this does not guarantee that the +task will be cancelled: the exception might be caught and +acted upon, delaying cancellation of the task or preventing +cancellation completely. The task may also return a value or +raise a different exception. + +Immediately after this method is called, Task.cancelled() will +not return True (unless the task was already cancelled). A +task will be marked as cancelled when the wrapped coroutine +terminates with a CancelledError exception (even if cancel() +was not called). +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_cancel_impl(TaskObj *self) +/*[clinic end generated code: output=6bfc0479da9d5757 input=13f9bf496695cb52]*/ +{ + if (self->task_state != STATE_PENDING) { + Py_RETURN_FALSE; + } + + if (self->task_fut_waiter) { + PyObject *res; + int is_true; + + res = _PyObject_CallMethodId( + self->task_fut_waiter, &PyId_cancel, NULL); + if (res == NULL) { + return NULL; + } + + is_true = PyObject_IsTrue(res); + Py_DECREF(res); + if (is_true < 0) { + return NULL; + } + + if (is_true) { + Py_RETURN_TRUE; + } + } + + self->task_must_cancel = 1; + Py_RETURN_TRUE; +} + +/*[clinic input] +_asyncio.Task.get_stack + + * + limit: 'O' = None + +Return the list of stack frames for this task's coroutine. + +If the coroutine is not done, this returns the stack where it is +suspended. If the coroutine has completed successfully or was +cancelled, this returns an empty list. If the coroutine was +terminated by an exception, this returns the list of traceback +frames. + +The frames are always ordered from oldest to newest. + +The optional limit gives the maximum number of frames to +return; by default all available frames are returned. Its +meaning differs depending on whether a stack or a traceback is +returned: the newest frames of a stack are returned, but the +oldest frames of a traceback are returned. (This matches the +behavior of the traceback module.) + +For reasons beyond our control, only one stack frame is +returned for a suspended coroutine. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_get_stack_impl(TaskObj *self, PyObject *limit) +/*[clinic end generated code: output=c9aeeeebd1e18118 input=b1920230a766d17a]*/ +{ + return PyObject_CallFunctionObjArgs( + asyncio_task_get_stack_func, self, limit, NULL); +} + +/*[clinic input] +_asyncio.Task.print_stack + + * + limit: 'O' = None + file: 'O' = None + +Print the stack or traceback for this task's coroutine. + +This produces output similar to that of the traceback module, +for the frames retrieved by get_stack(). The limit argument +is passed to get_stack(). The file argument is an I/O stream +to which the output is written; by default output is written +to sys.stderr. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_print_stack_impl(TaskObj *self, PyObject *limit, + PyObject *file) +/*[clinic end generated code: output=7339e10314cd3f4d input=19f1e99ab5400bc3]*/ +{ + return PyObject_CallFunctionObjArgs( + asyncio_task_print_stack_func, self, limit, file, NULL); +} + +static PyObject * +TaskObj_step(TaskObj *task, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"exc", NULL}; + PyObject *exc = NULL; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", kwlist, &exc)) { + return NULL; + } + + return task_step(task, exc); +} + +static PyObject * +TaskObj_wakeup(TaskObj *task, PyObject *arg) +{ + return task_wakeup(task, arg); +} + +static void +TaskObj_finalize(TaskObj *task) +{ + _Py_IDENTIFIER(call_exception_handler); + _Py_IDENTIFIER(task); + _Py_IDENTIFIER(message); + _Py_IDENTIFIER(source_traceback); + + PyObject *message = NULL; + PyObject *context = NULL; + PyObject *func = NULL; + PyObject *res = NULL; + + PyObject *error_type, *error_value, *error_traceback; + + if (task->task_state != STATE_PENDING || !task->task_log_destroy_pending) { + goto done; + } + + /* Save the current exception, if any. */ + PyErr_Fetch(&error_type, &error_value, &error_traceback); + + context = PyDict_New(); + if (context == NULL) { + goto finally; + } + + message = PyUnicode_FromString("Task was destroyed but it is pending!"); + if (message == NULL) { + goto finally; + } + + if (_PyDict_SetItemId(context, &PyId_message, message) < 0 || + _PyDict_SetItemId(context, &PyId_task, (PyObject*)task) < 0) + { + goto finally; + } + + if (task->task_source_tb != NULL) { + if (_PyDict_SetItemId(context, &PyId_source_traceback, + task->task_source_tb) < 0) + { + goto finally; + } + } + + func = _PyObject_GetAttrId(task->task_loop, &PyId_call_exception_handler); + if (func != NULL) { + res = _PyObject_CallArg1(func, context); + if (res == NULL) { + PyErr_WriteUnraisable(func); + } + } + +finally: + Py_CLEAR(context); + Py_CLEAR(message); + Py_CLEAR(func); + Py_CLEAR(res); + + /* Restore the saved exception. */ + PyErr_Restore(error_type, error_value, error_traceback); + +done: + FutureObj_finalize((FutureObj*)task); +} + +static void TaskObj_dealloc(PyObject *); /* Needs Task_CheckExact */ + +static PyMethodDef TaskType_methods[] = { + _ASYNCIO_FUTURE_RESULT_METHODDEF + _ASYNCIO_FUTURE_EXCEPTION_METHODDEF + _ASYNCIO_FUTURE_SET_RESULT_METHODDEF + _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF + _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF + _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF + _ASYNCIO_FUTURE_CANCELLED_METHODDEF + _ASYNCIO_FUTURE_DONE_METHODDEF + _ASYNCIO_TASK_CURRENT_TASK_METHODDEF + _ASYNCIO_TASK_ALL_TASKS_METHODDEF + _ASYNCIO_TASK_CANCEL_METHODDEF + _ASYNCIO_TASK_GET_STACK_METHODDEF + _ASYNCIO_TASK_PRINT_STACK_METHODDEF + {"_repr_info", (PyCFunction)TaskObj__repr_info, METH_NOARGS, NULL}, + {"_step", (PyCFunction)TaskObj_step, METH_VARARGS | METH_KEYWORDS, NULL}, + {"_wakeup", (PyCFunction)TaskObj_wakeup, METH_O, NULL}, + {NULL, NULL} /* Sentinel */ +}; + +static PyGetSetDef TaskType_getsetlist[] = { + FUTURE_COMMON_GETSETLIST + {"_log_destroy_pending", (getter)TaskObj_get_log_destroy_pending, + (setter)TaskObj_set_log_destroy_pending, NULL}, + {"_must_cancel", (getter)TaskObj_get_must_cancel, NULL, NULL}, + {"_coro", (getter)TaskObj_get_coro, NULL, NULL}, + {"_fut_waiter", (getter)TaskObj_get_fut_waiter, NULL, NULL}, + {NULL} /* Sentinel */ +}; + +static PyTypeObject TaskType = { + PyVarObject_HEAD_INIT(0, 0) + "_asyncio.Task", + sizeof(TaskObj), /* tp_basicsize */ + .tp_base = &FutureType, + .tp_dealloc = TaskObj_dealloc, + .tp_as_async = &FutureType_as_async, + .tp_repr = (reprfunc)FutureObj_repr, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE + | Py_TPFLAGS_HAVE_FINALIZE, + .tp_doc = _asyncio_Task___init____doc__, + .tp_traverse = (traverseproc)TaskObj_traverse, + .tp_clear = (inquiry)TaskObj_clear, + .tp_weaklistoffset = offsetof(TaskObj, task_weakreflist), + .tp_iter = (getiterfunc)future_new_iter, + .tp_methods = TaskType_methods, + .tp_getset = TaskType_getsetlist, + .tp_dictoffset = offsetof(TaskObj, dict), + .tp_init = (initproc)_asyncio_Task___init__, + .tp_new = PyType_GenericNew, + .tp_finalize = (destructor)TaskObj_finalize, +}; + +#define Task_CheckExact(obj) (Py_TYPE(obj) == &TaskType) + +static void +TaskObj_dealloc(PyObject *self) +{ + TaskObj *task = (TaskObj *)self; + + if (Task_CheckExact(self)) { + /* When fut is subclass of Task, finalizer is called from + * subtype_dealloc. + */ + if (PyObject_CallFinalizerFromDealloc(self) < 0) { + // resurrected. + return; + } + } + + if (task->task_weakreflist != NULL) { + PyObject_ClearWeakRefs(self); + } + + (void)TaskObj_clear(task); + Py_TYPE(task)->tp_free(task); +} + +static PyObject * +task_set_error_soon(TaskObj *task, PyObject *et, const char *format, ...) +{ + PyObject* msg; + + va_list vargs; +#ifdef HAVE_STDARG_PROTOTYPES + va_start(vargs, format); +#else + va_start(vargs); +#endif + msg = PyUnicode_FromFormatV(format, vargs); + va_end(vargs); + + if (msg == NULL) { + return NULL; + } + + PyObject *e = PyObject_CallFunctionObjArgs(et, msg, NULL); + Py_DECREF(msg); + if (e == NULL) { + return NULL; + } + + if (task_call_step_soon(task, e) == -1) { + Py_DECREF(e); + return NULL; + } + + Py_DECREF(e); + Py_RETURN_NONE; +} + +static PyObject * +task_step_impl(TaskObj *task, PyObject *exc) +{ + int res; + int clear_exc = 0; + PyObject *result = NULL; + PyObject *coro = task->task_coro; + PyObject *o; + + if (task->task_state != STATE_PENDING) { + PyErr_Format(PyExc_AssertionError, + "_step(): already done: %R %R", + task, + exc ? exc : Py_None); goto fail; } - asyncio_CancelledError = PyObject_GetAttrString(module, "CancelledError"); - if (asyncio_CancelledError == NULL) { + if (task->task_must_cancel) { + assert(exc != Py_None); + + if (exc) { + /* Check if exc is a CancelledError */ + res = PyObject_IsInstance(exc, asyncio_CancelledError); + if (res == -1) { + /* An error occurred, abort */ + goto fail; + } + if (res == 0) { + /* exc is not CancelledError; reset it to NULL */ + exc = NULL; + } + } + + if (!exc) { + /* exc was not a CancelledError */ + exc = PyObject_CallFunctionObjArgs(asyncio_CancelledError, NULL); + if (!exc) { + goto fail; + } + clear_exc = 1; + } + + task->task_must_cancel = 0; + } + + Py_CLEAR(task->task_fut_waiter); + + if (exc == NULL) { + if (PyGen_CheckExact(coro) || PyCoro_CheckExact(coro)) { + result = _PyGen_Send((PyGenObject*)coro, Py_None); + } + else { + result = _PyObject_CallMethodIdObjArgs( + coro, &PyId_send, Py_None, NULL); + } + } else { + result = _PyObject_CallMethodIdObjArgs( + coro, &PyId_throw, exc, NULL); + if (clear_exc) { + /* We created 'exc' during this call */ + Py_CLEAR(exc); + } + } + + if (result == NULL) { + PyObject *et, *ev, *tb; + + if (_PyGen_FetchStopIterationValue(&o) == 0) { + /* The error is StopIteration and that means that + the underlying coroutine has resolved */ + PyObject *res = future_set_result((FutureObj*)task, o); + Py_DECREF(o); + if (res == NULL) { + return NULL; + } + Py_DECREF(res); + Py_RETURN_NONE; + } + + if (PyErr_ExceptionMatches(asyncio_CancelledError)) { + /* CancelledError */ + PyErr_Clear(); + return future_cancel((FutureObj*)task); + } + + /* Some other exception; pop it and call Task.set_exception() */ + PyErr_Fetch(&et, &ev, &tb); + assert(et); + if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) { + PyErr_NormalizeException(&et, &ev, &tb); + } + o = future_set_exception((FutureObj*)task, ev); + if (!o) { + /* An exception in Task.set_exception() */ + Py_XDECREF(et); + Py_XDECREF(tb); + Py_XDECREF(ev); + goto fail; + } + assert(o == Py_None); + Py_CLEAR(o); + + if (!PyErr_GivenExceptionMatches(et, PyExc_Exception)) { + /* We've got a BaseException; re-raise it */ + PyErr_Restore(et, ev, tb); + goto fail; + } + + Py_XDECREF(et); + Py_XDECREF(tb); + Py_XDECREF(ev); + + Py_RETURN_NONE; + } + + /* Check if `result` is FutureObj or TaskObj (and not a subclass) */ + if (Future_CheckExact(result) || Task_CheckExact(result)) { + PyObject *wrapper; + PyObject *res; + + /* Check if `result` future is attached to a different loop */ + if (((FutureObj*)result)->fut_loop != task->task_loop) { + goto different_loop; + } + + if (result == (PyObject*)task) { + /* We have a task that wants to await on itself */ + PyObject *res; + res = task_set_error_soon( + task, PyExc_RuntimeError, + "Task cannot await on itself: %R", task); + Py_DECREF(result); + return res; + } + + FutureObj *fut = (FutureObj*)result; + + if (fut->fut_blocking) { + fut->fut_blocking = 0; + + /* result.add_done_callback(task._wakeup) */ + wrapper = TaskWakeupMethWrapper_new(task); + if (wrapper == NULL) { + goto fail; + } + res = future_add_done_callback((FutureObj*)result, wrapper); + Py_DECREF(wrapper); + if (res == NULL) { + goto fail; + } + Py_DECREF(res); + + /* task._fut_waiter = result */ + task->task_fut_waiter = result; /* no incref is necessary */ + + if (task->task_must_cancel) { + PyObject *r; + r = future_cancel(fut); + if (r == NULL) { + return NULL; + } + if (r == Py_True) { + task->task_must_cancel = 0; + } + Py_DECREF(r); + } + + Py_RETURN_NONE; + } + else { + goto yield_insteadof_yf; + } + } + + /* Check if `result` is a Future-compatible object */ + o = PyObject_GetAttrString(result, "_asyncio_future_blocking"); + if (o == NULL) { + if (PyErr_ExceptionMatches(PyExc_AttributeError)) { + PyErr_Clear(); + } + else { + goto fail; + } + } + else { + if (o == Py_None) { + Py_CLEAR(o); + } + else { + /* `result` is a Future-compatible object */ + PyObject *wrapper; + PyObject *res; + + int blocking = PyObject_IsTrue(o); + Py_CLEAR(o); + if (blocking < 0) { + goto fail; + } + + /* Check if `result` future is attached to a different loop */ + PyObject *oloop = PyObject_GetAttrString(result, "_loop"); + if (oloop == NULL) { + goto fail; + } + if (oloop != task->task_loop) { + Py_DECREF(oloop); + goto different_loop; + } else { + Py_DECREF(oloop); + } + + if (blocking) { + /* result._asyncio_future_blocking = False */ + if (PyObject_SetAttrString( + result, "_asyncio_future_blocking", Py_False) == -1) { + goto fail; + } + + /* result.add_done_callback(task._wakeup) */ + wrapper = TaskWakeupMethWrapper_new(task); + if (wrapper == NULL) { + goto fail; + } + res = _PyObject_CallMethodId( + result, &PyId_add_done_callback, "O", wrapper, NULL); + Py_DECREF(wrapper); + if (res == NULL) { + goto fail; + } + Py_DECREF(res); + + /* task._fut_waiter = result */ + task->task_fut_waiter = result; /* no incref is necessary */ + + if (task->task_must_cancel) { + PyObject *r; + int is_true; + r = _PyObject_CallMethodId(result, &PyId_cancel, NULL); + if (r == NULL) { + return NULL; + } + is_true = PyObject_IsTrue(r); + Py_DECREF(r); + if (is_true < 0) { + return NULL; + } + else if (is_true) { + task->task_must_cancel = 0; + } + } + + Py_RETURN_NONE; + } + else { + goto yield_insteadof_yf; + } + } + } + + /* Check if `result` is None */ + if (result == Py_None) { + /* Bare yield relinquishes control for one event loop iteration. */ + if (task_call_step_soon(task, NULL)) { + goto fail; + } + Py_DECREF(result); + Py_RETURN_NONE; + } + + /* Check if `result` is a generator */ + o = PyObject_CallFunctionObjArgs(inspect_isgenerator, result, NULL); + if (o == NULL) { + /* An exception in inspect.isgenerator */ + goto fail; + } + res = PyObject_IsTrue(o); + Py_CLEAR(o); + if (res == -1) { + /* An exception while checking if 'val' is True */ + goto fail; + } + if (res == 1) { + /* `result` is a generator */ + PyObject *ret; + ret = task_set_error_soon( + task, PyExc_RuntimeError, + "yield was used instead of yield from for " + "generator in task %R with %S", task, result); + Py_DECREF(result); + return ret; + } + + /* The `result` is none of the above */ + Py_DECREF(result); + return task_set_error_soon( + task, PyExc_RuntimeError, "Task got bad yield: %R", result); + +yield_insteadof_yf: + o = task_set_error_soon( + task, PyExc_RuntimeError, + "yield was used instead of yield from " + "in task %R with %R", + task, result); + Py_DECREF(result); + return o; + +different_loop: + o = task_set_error_soon( + task, PyExc_RuntimeError, + "Task %R got Future %R attached to a different loop", + task, result); + Py_DECREF(result); + return o; + +fail: + Py_XDECREF(result); + return NULL; +} + +static PyObject * +task_step(TaskObj *task, PyObject *exc) +{ + PyObject *res; + PyObject *ot; + + if (PyDict_SetItem((PyObject *)current_tasks, + task->task_loop, (PyObject*)task) == -1) + { + return NULL; + } + + res = task_step_impl(task, exc); + + if (res == NULL) { + PyObject *et, *ev, *tb; + PyErr_Fetch(&et, &ev, &tb); + ot = _PyDict_Pop(current_tasks, task->task_loop, NULL); + if (ot == NULL) { + Py_XDECREF(et); + Py_XDECREF(tb); + Py_XDECREF(ev); + return NULL; + } + Py_DECREF(ot); + PyErr_Restore(et, ev, tb); + return NULL; + } + else { + ot = _PyDict_Pop(current_tasks, task->task_loop, NULL); + if (ot == NULL) { + Py_DECREF(res); + return NULL; + } + else { + Py_DECREF(ot); + return res; + } + } +} + +static PyObject * +task_wakeup(TaskObj *task, PyObject *o) +{ + assert(o); + + if (Future_CheckExact(o) || Task_CheckExact(o)) { + PyObject *fut_result = NULL; + int res = future_get_result((FutureObj*)o, &fut_result); + PyObject *result; + + switch(res) { + case -1: + assert(fut_result == NULL); + return NULL; + case 0: + Py_DECREF(fut_result); + return task_step(task, NULL); + default: + assert(res == 1); + result = task_step(task, fut_result); + Py_DECREF(fut_result); + return result; + } + } + + PyObject *fut_result = PyObject_CallMethod(o, "result", NULL); + if (fut_result == NULL) { + PyObject *et, *ev, *tb; + PyObject *res; + + PyErr_Fetch(&et, &ev, &tb); + if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) { + PyErr_NormalizeException(&et, &ev, &tb); + } + + res = task_step(task, ev); + + Py_XDECREF(et); + Py_XDECREF(tb); + Py_XDECREF(ev); + + return res; + } + else { + Py_DECREF(fut_result); + return task_step(task, NULL); + } +} + + +/*********************** Module **************************/ + + +static void +module_free(void *m) +{ + Py_CLEAR(current_tasks); + Py_CLEAR(all_tasks); + Py_CLEAR(traceback_extract_stack); + Py_CLEAR(asyncio_get_event_loop); + Py_CLEAR(asyncio_future_repr_info_func); + Py_CLEAR(asyncio_task_repr_info_func); + Py_CLEAR(asyncio_task_get_stack_func); + Py_CLEAR(asyncio_task_print_stack_func); + Py_CLEAR(asyncio_InvalidStateError); + Py_CLEAR(asyncio_CancelledError); + Py_CLEAR(inspect_isgenerator); +} + +static int +module_init(void) +{ + PyObject *module = NULL; + PyObject *cls; + +#define WITH_MOD(NAME) \ + Py_CLEAR(module); \ + module = PyImport_ImportModule(NAME); \ + if (module == NULL) { \ + return -1; \ + } + +#define GET_MOD_ATTR(VAR, NAME) \ + VAR = PyObject_GetAttrString(module, NAME); \ + if (VAR == NULL) { \ + goto fail; \ + } + + WITH_MOD("asyncio.events") + GET_MOD_ATTR(asyncio_get_event_loop, "get_event_loop") + + WITH_MOD("asyncio.base_futures") + GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info") + GET_MOD_ATTR(asyncio_InvalidStateError, "InvalidStateError") + GET_MOD_ATTR(asyncio_CancelledError, "CancelledError") + + WITH_MOD("asyncio.base_tasks") + GET_MOD_ATTR(asyncio_task_repr_info_func, "_task_repr_info") + GET_MOD_ATTR(asyncio_task_get_stack_func, "_task_get_stack") + GET_MOD_ATTR(asyncio_task_print_stack_func, "_task_print_stack") + + WITH_MOD("inspect") + GET_MOD_ATTR(inspect_isgenerator, "isgenerator") + + WITH_MOD("traceback") + GET_MOD_ATTR(traceback_extract_stack, "extract_stack") + + WITH_MOD("weakref") + GET_MOD_ATTR(cls, "WeakSet") + all_tasks = PyObject_CallObject(cls, NULL); + Py_CLEAR(cls); + if (all_tasks == NULL) { goto fail; } - Py_DECREF(module); + current_tasks = (PyDictObject *)PyDict_New(); + if (current_tasks == NULL) { + goto fail; + } + + Py_CLEAR(module); return 0; fail: - Py_CLEAR(traceback_extract_stack); - Py_CLEAR(asyncio_get_event_loop); - Py_CLEAR(asyncio_repr_info_func); - Py_CLEAR(asyncio_InvalidStateError); - Py_CLEAR(asyncio_CancelledError); Py_CLEAR(module); + module_free(NULL); return -1; + +#undef WITH_MOD +#undef GET_MOD_ATTR } - PyDoc_STRVAR(module_doc, "Accelerator module for asyncio"); static struct PyModuleDef _asynciomodule = { @@ -1006,14 +2324,14 @@ NULL, /* m_slots */ NULL, /* m_traverse */ NULL, /* m_clear */ - NULL, /* m_free */ + (freefunc)module_free /* m_free */ }; PyMODINIT_FUNC PyInit__asyncio(void) { - if (init_module() < 0) { + if (module_init() < 0) { return NULL; } if (PyType_Ready(&FutureType) < 0) { @@ -1022,6 +2340,15 @@ if (PyType_Ready(&FutureIterType) < 0) { return NULL; } + if (PyType_Ready(&TaskSendMethWrapper_Type) < 0) { + return NULL; + } + if(PyType_Ready(&TaskWakeupMethWrapper_Type) < 0) { + return NULL; + } + if (PyType_Ready(&TaskType) < 0) { + return NULL; + } PyObject *m = PyModule_Create(&_asynciomodule); if (m == NULL) { @@ -1034,5 +2361,11 @@ return NULL; } + Py_INCREF(&TaskType); + if (PyModule_AddObject(m, "Task", (PyObject *)&TaskType) < 0) { + Py_DECREF(&TaskType); + return NULL; + } + return m; } diff -r deb3e5857d8c Modules/clinic/_asynciomodule.c.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Modules/clinic/_asynciomodule.c.h Fri Oct 28 00:12:34 2016 -0400 @@ -0,0 +1,411 @@ +/*[clinic input] +preserve +[clinic start generated code]*/ + +PyDoc_STRVAR(_asyncio_Future___init____doc__, +"Future(*, loop=None)\n" +"--\n" +"\n" +"This class is *almost* compatible with concurrent.futures.Future.\n" +"\n" +" Differences:\n" +"\n" +" - result() and exception() do not take a timeout argument and\n" +" raise an exception when the future isn\'t done yet.\n" +"\n" +" - Callbacks registered with add_done_callback() are always called\n" +" via the event loop\'s call_soon_threadsafe().\n" +"\n" +" - This class is not compatible with the wait() and as_completed()\n" +" methods in the concurrent.futures package."); + +static int +_asyncio_Future___init___impl(FutureObj *self, PyObject *loop); + +static int +_asyncio_Future___init__(PyObject *self, PyObject *args, PyObject *kwargs) +{ + int return_value = -1; + static const char * const _keywords[] = {"loop", NULL}; + static _PyArg_Parser _parser = {"|$O:Future", _keywords, 0}; + PyObject *loop = NULL; + + if (!_PyArg_ParseTupleAndKeywordsFast(args, kwargs, &_parser, + &loop)) { + goto exit; + } + return_value = _asyncio_Future___init___impl((FutureObj *)self, loop); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio_Future_result__doc__, +"result($self, /)\n" +"--\n" +"\n" +"Return the result this future represents.\n" +"\n" +"If the future has been cancelled, raises CancelledError. If the\n" +"future\'s result isn\'t yet available, raises InvalidStateError. If\n" +"the future is done and has an exception set, this exception is raised."); + +#define _ASYNCIO_FUTURE_RESULT_METHODDEF \ + {"result", (PyCFunction)_asyncio_Future_result, METH_NOARGS, _asyncio_Future_result__doc__}, + +static PyObject * +_asyncio_Future_result_impl(FutureObj *self); + +static PyObject * +_asyncio_Future_result(FutureObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Future_result_impl(self); +} + +PyDoc_STRVAR(_asyncio_Future_exception__doc__, +"exception($self, /)\n" +"--\n" +"\n" +"Return the exception that was set on this future.\n" +"\n" +"The exception (or None if no exception was set) is returned only if\n" +"the future is done. If the future has been cancelled, raises\n" +"CancelledError. If the future isn\'t done yet, raises\n" +"InvalidStateError."); + +#define _ASYNCIO_FUTURE_EXCEPTION_METHODDEF \ + {"exception", (PyCFunction)_asyncio_Future_exception, METH_NOARGS, _asyncio_Future_exception__doc__}, + +static PyObject * +_asyncio_Future_exception_impl(FutureObj *self); + +static PyObject * +_asyncio_Future_exception(FutureObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Future_exception_impl(self); +} + +PyDoc_STRVAR(_asyncio_Future_set_result__doc__, +"set_result($self, res, /)\n" +"--\n" +"\n" +"Mark the future done and set its result.\n" +"\n" +"If the future is already done when this method is called, raises\n" +"InvalidStateError."); + +#define _ASYNCIO_FUTURE_SET_RESULT_METHODDEF \ + {"set_result", (PyCFunction)_asyncio_Future_set_result, METH_O, _asyncio_Future_set_result__doc__}, + +PyDoc_STRVAR(_asyncio_Future_set_exception__doc__, +"set_exception($self, exception, /)\n" +"--\n" +"\n" +"Mark the future done and set an exception.\n" +"\n" +"If the future is already done when this method is called, raises\n" +"InvalidStateError."); + +#define _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF \ + {"set_exception", (PyCFunction)_asyncio_Future_set_exception, METH_O, _asyncio_Future_set_exception__doc__}, + +PyDoc_STRVAR(_asyncio_Future_add_done_callback__doc__, +"add_done_callback($self, fn, /)\n" +"--\n" +"\n" +"Add a callback to be run when the future becomes done.\n" +"\n" +"The callback is called with a single argument - the future object. If\n" +"the future is already done when this is called, the callback is\n" +"scheduled with call_soon."); + +#define _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF \ + {"add_done_callback", (PyCFunction)_asyncio_Future_add_done_callback, METH_O, _asyncio_Future_add_done_callback__doc__}, + +PyDoc_STRVAR(_asyncio_Future_remove_done_callback__doc__, +"remove_done_callback($self, fn, /)\n" +"--\n" +"\n" +"Remove all instances of a callback from the \"call when done\" list.\n" +"\n" +"Returns the number of callbacks removed."); + +#define _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF \ + {"remove_done_callback", (PyCFunction)_asyncio_Future_remove_done_callback, METH_O, _asyncio_Future_remove_done_callback__doc__}, + +PyDoc_STRVAR(_asyncio_Future_cancel__doc__, +"cancel($self, /)\n" +"--\n" +"\n" +"Cancel the future and schedule callbacks.\n" +"\n" +"If the future is already done or cancelled, return False. Otherwise,\n" +"change the future\'s state to cancelled, schedule the callbacks and\n" +"return True."); + +#define _ASYNCIO_FUTURE_CANCEL_METHODDEF \ + {"cancel", (PyCFunction)_asyncio_Future_cancel, METH_NOARGS, _asyncio_Future_cancel__doc__}, + +static PyObject * +_asyncio_Future_cancel_impl(FutureObj *self); + +static PyObject * +_asyncio_Future_cancel(FutureObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Future_cancel_impl(self); +} + +PyDoc_STRVAR(_asyncio_Future_cancelled__doc__, +"cancelled($self, /)\n" +"--\n" +"\n" +"Return True if the future was cancelled."); + +#define _ASYNCIO_FUTURE_CANCELLED_METHODDEF \ + {"cancelled", (PyCFunction)_asyncio_Future_cancelled, METH_NOARGS, _asyncio_Future_cancelled__doc__}, + +static PyObject * +_asyncio_Future_cancelled_impl(FutureObj *self); + +static PyObject * +_asyncio_Future_cancelled(FutureObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Future_cancelled_impl(self); +} + +PyDoc_STRVAR(_asyncio_Future_done__doc__, +"done($self, /)\n" +"--\n" +"\n" +"Return True if the future is done.\n" +"\n" +"Done means either that a result / exception are available, or that the\n" +"future was cancelled."); + +#define _ASYNCIO_FUTURE_DONE_METHODDEF \ + {"done", (PyCFunction)_asyncio_Future_done, METH_NOARGS, _asyncio_Future_done__doc__}, + +static PyObject * +_asyncio_Future_done_impl(FutureObj *self); + +static PyObject * +_asyncio_Future_done(FutureObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Future_done_impl(self); +} + +PyDoc_STRVAR(_asyncio_Task___init____doc__, +"Task(coro, *, loop=None)\n" +"--\n" +"\n" +"A coroutine wrapped in a Future."); + +static int +_asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop); + +static int +_asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) +{ + int return_value = -1; + static const char * const _keywords[] = {"coro", "loop", NULL}; + static _PyArg_Parser _parser = {"O|$O:Task", _keywords, 0}; + PyObject *coro; + PyObject *loop = NULL; + + if (!_PyArg_ParseTupleAndKeywordsFast(args, kwargs, &_parser, + &coro, &loop)) { + goto exit; + } + return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio_Task_current_task__doc__, +"current_task($type, /, loop=None)\n" +"--\n" +"\n" +"Return the currently running task in an event loop or None.\n" +"\n" +"By default the current task for the current event loop is returned.\n" +"\n" +"None is returned when called not in the context of a Task."); + +#define _ASYNCIO_TASK_CURRENT_TASK_METHODDEF \ + {"current_task", (PyCFunction)_asyncio_Task_current_task, METH_FASTCALL|METH_CLASS, _asyncio_Task_current_task__doc__}, + +static PyObject * +_asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop); + +static PyObject * +_asyncio_Task_current_task(PyTypeObject *type, PyObject **args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"loop", NULL}; + static _PyArg_Parser _parser = {"|O:current_task", _keywords, 0}; + PyObject *loop = NULL; + + if (!_PyArg_ParseStack(args, nargs, kwnames, &_parser, + &loop)) { + goto exit; + } + return_value = _asyncio_Task_current_task_impl(type, loop); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio_Task_all_tasks__doc__, +"all_tasks($type, /, loop=None)\n" +"--\n" +"\n" +"Return a set of all tasks for an event loop.\n" +"\n" +"By default all tasks for the current event loop are returned."); + +#define _ASYNCIO_TASK_ALL_TASKS_METHODDEF \ + {"all_tasks", (PyCFunction)_asyncio_Task_all_tasks, METH_FASTCALL|METH_CLASS, _asyncio_Task_all_tasks__doc__}, + +static PyObject * +_asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop); + +static PyObject * +_asyncio_Task_all_tasks(PyTypeObject *type, PyObject **args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"loop", NULL}; + static _PyArg_Parser _parser = {"|O:all_tasks", _keywords, 0}; + PyObject *loop = NULL; + + if (!_PyArg_ParseStack(args, nargs, kwnames, &_parser, + &loop)) { + goto exit; + } + return_value = _asyncio_Task_all_tasks_impl(type, loop); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio_Task_cancel__doc__, +"cancel($self, /)\n" +"--\n" +"\n" +"Request that this task cancel itself.\n" +"\n" +"This arranges for a CancelledError to be thrown into the\n" +"wrapped coroutine on the next cycle through the event loop.\n" +"The coroutine then has a chance to clean up or even deny\n" +"the request using try/except/finally.\n" +"\n" +"Unlike Future.cancel, this does not guarantee that the\n" +"task will be cancelled: the exception might be caught and\n" +"acted upon, delaying cancellation of the task or preventing\n" +"cancellation completely. The task may also return a value or\n" +"raise a different exception.\n" +"\n" +"Immediately after this method is called, Task.cancelled() will\n" +"not return True (unless the task was already cancelled). A\n" +"task will be marked as cancelled when the wrapped coroutine\n" +"terminates with a CancelledError exception (even if cancel()\n" +"was not called)."); + +#define _ASYNCIO_TASK_CANCEL_METHODDEF \ + {"cancel", (PyCFunction)_asyncio_Task_cancel, METH_NOARGS, _asyncio_Task_cancel__doc__}, + +static PyObject * +_asyncio_Task_cancel_impl(TaskObj *self); + +static PyObject * +_asyncio_Task_cancel(TaskObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Task_cancel_impl(self); +} + +PyDoc_STRVAR(_asyncio_Task_get_stack__doc__, +"get_stack($self, /, *, limit=None)\n" +"--\n" +"\n" +"Return the list of stack frames for this task\'s coroutine.\n" +"\n" +"If the coroutine is not done, this returns the stack where it is\n" +"suspended. If the coroutine has completed successfully or was\n" +"cancelled, this returns an empty list. If the coroutine was\n" +"terminated by an exception, this returns the list of traceback\n" +"frames.\n" +"\n" +"The frames are always ordered from oldest to newest.\n" +"\n" +"The optional limit gives the maximum number of frames to\n" +"return; by default all available frames are returned. Its\n" +"meaning differs depending on whether a stack or a traceback is\n" +"returned: the newest frames of a stack are returned, but the\n" +"oldest frames of a traceback are returned. (This matches the\n" +"behavior of the traceback module.)\n" +"\n" +"For reasons beyond our control, only one stack frame is\n" +"returned for a suspended coroutine."); + +#define _ASYNCIO_TASK_GET_STACK_METHODDEF \ + {"get_stack", (PyCFunction)_asyncio_Task_get_stack, METH_FASTCALL, _asyncio_Task_get_stack__doc__}, + +static PyObject * +_asyncio_Task_get_stack_impl(TaskObj *self, PyObject *limit); + +static PyObject * +_asyncio_Task_get_stack(TaskObj *self, PyObject **args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"limit", NULL}; + static _PyArg_Parser _parser = {"|$O:get_stack", _keywords, 0}; + PyObject *limit = Py_None; + + if (!_PyArg_ParseStack(args, nargs, kwnames, &_parser, + &limit)) { + goto exit; + } + return_value = _asyncio_Task_get_stack_impl(self, limit); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio_Task_print_stack__doc__, +"print_stack($self, /, *, limit=None, file=None)\n" +"--\n" +"\n" +"Print the stack or traceback for this task\'s coroutine.\n" +"\n" +"This produces output similar to that of the traceback module,\n" +"for the frames retrieved by get_stack(). The limit argument\n" +"is passed to get_stack(). The file argument is an I/O stream\n" +"to which the output is written; by default output is written\n" +"to sys.stderr."); + +#define _ASYNCIO_TASK_PRINT_STACK_METHODDEF \ + {"print_stack", (PyCFunction)_asyncio_Task_print_stack, METH_FASTCALL, _asyncio_Task_print_stack__doc__}, + +static PyObject * +_asyncio_Task_print_stack_impl(TaskObj *self, PyObject *limit, + PyObject *file); + +static PyObject * +_asyncio_Task_print_stack(TaskObj *self, PyObject **args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"limit", "file", NULL}; + static _PyArg_Parser _parser = {"|$OO:print_stack", _keywords, 0}; + PyObject *limit = Py_None; + PyObject *file = Py_None; + + if (!_PyArg_ParseStack(args, nargs, kwnames, &_parser, + &limit, &file)) { + goto exit; + } + return_value = _asyncio_Task_print_stack_impl(self, limit, file); + +exit: + return return_value; +} +/*[clinic end generated code: output=bdc0b4f86cd6e509 input=a9049054013a1b77]*/