import asyncio import sys import time loop = asyncio.get_event_loop() task_count = 0 async def my_generator(sleep): for i in range(40): try: await asyncio.sleep(sleep) print('my_generator: [%s] PRODUCE VALUE' % sleep, i) yield i #'%s %s' % (i, sleep) except asyncio.CancelledError: print( 'my_generator: ---------------- generator was cancelled, but will not exit, instead it will be suspended', i, sleep) yield None print('my_generator: ---------------- generator was RESUMED!', i, sleep) consumer_lock = asyncio.Lock() try_with_more_sleep = False close_generators = False async def a_task(async_generator,consumer, finalizer): """ :param async_generator: :param consumer: :param finalizer: :return: task reads from async generator and sends to consumer protected by lock so only one task can push to the same consumer at a time. * many 'tasks' read different "sources" or read one source and all push to the same consumer. tasks may be cancelled and then restarted fresh to continue reading from active async generator they may read until canceled or until consumer does not accept anymore, if consumer does not accept - task should quit immediately """ global task_count task_count +=1 tc = task_count print('FRESH TASK HAVE STARTED!',tc) finally_enter = time.time() try: async for value in async_generator: if try_with_more_sleep: await asyncio.sleep(.1) if value is None: finally_enter = time.time() print('a_task: raise asyncio.CancelledError!', tc) raise asyncio.CancelledError('I WAS cancelled, my generator is suspended and alive, task id= %s '%tc) else: async with consumer_lock: print('will send :',value,'task id=',tc) if try_with_more_sleep: await asyncio.sleep(.1) await consumer.asend(value) if try_with_more_sleep: await asyncio.sleep(.1) print('send done:', value,'task id=',tc) # return 'I WAS cancelled, my generator is suspended and alive' except StopAsyncIteration: print('consumer does not accept events anymore! cant push',value,'task id=',tc) finally: finally_exit = time.time() # finalizer.set() print('IMPORTANT ! task finally!... @',value,sys.exc_info(), 'finally_enter=',finally_enter,'finally_exit=',finally_exit, 'FINALLY DELTA:',finally_exit-finally_enter,'task id=',tc) return 'OK, last value %s task id=%s' % (value,tc) # async def consumer(): # # while True: # consume = yield # print('consume',repr(consume)) # if not consume % 4: # await asyncio.sleep(2) class task_context: """ helper class which starts task when control enters the context and issues ".cancel()" whe context is exiting WHAT IS NOT WORKING: i cant make it to wait until task will be cancelled it just ignores it until after loop has nothing to do at all. WHAT I WANT: i want to wait here in the __aexit__ until wrapped task is really done, when "finally" is executed when all resources are free so execution can advance to next stage. """ def __init__(self, task, finalizer): self.finalizer = finalizer self.task = task async def __aenter__(self): print('__aenter__ loop.create_task(self.task)') print(self.task) self.running_task = loop.create_task(self.task) self.running_task.add_done_callback(lambda t: print('task done callback [%s]' % t)) return self.running_task async def __aexit__(self, exc_type, exc_val, exc_tb): try: # print('__aexit__ await self.finalizer.wait()') # await self.finalizer.wait() # print('self.task.close()') # self.task.close() - CANT CLOSE IT! print('__aexit__ self.running_task.cancel()') self.running_task.cancel() print('!!!!!!!!!!!!!!! ->>> WAITING ', self.running_task) task_rsult = await self.running_task except asyncio.CancelledError: print('task was cancelled, or not?') else: print('done!', task_rsult) finally: print('__aexit__ FINALLY',sys.exc_info()) async def test(ready): consumer = yield generator = my_generator(0.3) # 4 generator2 = my_generator(0.4) # 5 # generator = my_generator(0.3) # 4 # generator2 = my_generator(1.4) # 5 # con = consumer() # await con.asend(None) f1 = asyncio.Event() f2 = asyncio.Event() f3 = asyncio.Event() f4 = asyncio.Event() t1_0 = a_task(generator,consumer,f1) t1_1 = a_task(generator,consumer,f2) t1_2 = a_task(generator,consumer,f3) t2 = a_task(generator2 ,consumer,f4) async with task_context(t2,f4) as R2: async with task_context(t1_0,f1) as R1_0: print('SLEEP') while True: consume = yield print('consume 1', repr(consume)) if not consume % 4: await asyncio.sleep(0.2) if consume > 10 :break print('WAKEUP!',R1_0) # await R1_0 # uncomment to see another bug. await asyncio.sleep(3) async with task_context(t1_1,f2) as R1_1: print('SLEEP') while True: consume = yield print('consume 2', repr(consume)) if not consume % 4: await asyncio.sleep(0.2) if consume > 20 :break print('WAKEUP!') # await R1_1 await asyncio.sleep(3) async with task_context(t1_2,f3) as R1_2: print('SLEEP') while True: consume = yield print('consume 3 ', repr(consume)) if not consume % 4: await asyncio.sleep(0.2) if consume > 30 : print('_________________END____________________') break print('WAKEUP!',R1_2) # await R1_2 print('done stage 3!') if close_generators: await generator.aclose() await generator2.aclose() print('done! nested -------------------- there should be nothing else!.......') await asyncio.sleep(3) ready.set() async def main_test(): ready = asyncio.Event() test_coro = test(ready) await test_coro.asend(None) await test_coro.asend(test_coro) await ready.wait() print('MAIN DONE!') i_am_task = loop.create_task(main_test()) loop.run_until_complete(i_am_task)