import asyncio import collections import collections.abc import logging import gc from typing import AsyncIterable, AsyncIterator, Awaitable, TypeVar T = TypeVar('T') def aiter(aiterable: AsyncIterable[T]) -> AsyncIterator[T]: if not isinstance(aiterable, collections.abc.AsyncIterable): raise TypeError( f'{type(aiterable).__name__!r} object ' f'is not asynchronously iterable') return aiterable.__aiter__() def anext(aiterator: AsyncIterator[T]) -> Awaitable[T]: if not isinstance(aiterator, collections.abc.AsyncIterator): raise TypeError(f'{type(aiterator).__name__!r} object ' f'is not an asynchronous iterator') return aiterator.__anext__() async def agen(): logger = logging.getLogger('agen') try: for item in ['ONE', 'TWO', 'THREE', 'FOUR']: logger.debug(f'agen yielding {item}') yield item finally: logger.debug('agen finalizing') async def run_gc_in_executor(): loop = asyncio.get_running_loop() while True: await loop.run_in_executor(None, gc.collect) await asyncio.sleep(1) # This one doesn't have problems. async def use_agen_asyncfor(): ag = agen() async for item in ag: if item == 'THREE': break # This one doesn't have problems. async def use_agen_anext_same_task(): ag = agen() ai = aiter(ag) while True: try: item = await anext(ai) except StopAsyncIteration: break if item == 'THREE': break # This one doesn't have problems. async def use_agen_anext_two_tasks(): ag = agen() ai = aiter(ag) async def use_this(): while True: try: item = await anext(ai) except StopAsyncIteration: break if item == 'THREE': break asyncio.create_task(use_this()) # This one will cause problems. async def use_agen_anext_separate_tasks(): ag = agen() ai = aiter(ag) async def use_one(): try: item = await anext(ai) except StopAsyncIteration: return if item == 'THREE': return asyncio.create_task(use_one()) asyncio.create_task(use_one()) async def use_agen_loop(): while True: # XXX change here # asyncio.create_task(use_agen_asyncfor()) # asyncio.create_task(use_agen_anext_same_task()) # asyncio.create_task(use_agen_anext_two_tasks()) asyncio.create_task(use_agen_anext_separate_tasks()) await asyncio.sleep(0.5) def main(): logging.basicConfig( level=logging.DEBUG, format='%(levelname)s %(name)s %(message)s', ) loop = asyncio.get_event_loop() loop.set_debug(True) loop.create_task(run_gc_in_executor()) loop.create_task(use_agen_loop()) try: loop.run_forever() except (KeyboardInterrupt, SystemExit): pass finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() if __name__ == '__main__': main()