import asyncio from pympler import muppy from pympler import summary class AutoclearingQueue(asyncio.Queue): @asyncio.coroutine def get(self): """ Code from asyncio.Queue with added `add_done_callback` """ while self.empty(): getter = asyncio.futures.Future(loop=self._loop) getter.add_done_callback(self._clean_getters) self._getters.append(getter) try: yield from getter except: getter.cancel() # Just in case getter is not done yet. if not self.empty() and not getter.cancelled(): # We were woken up by put_nowait(), but can't take # the call. Wake up the next in line. self._wakeup_next(self._getters) raise return self.get_nowait() def _clean_getters(self, fut): try: self._getters.remove(fut) except ValueError: pass q = AutoclearingQueue() loop = asyncio.get_event_loop() closing = False async def get_with_timeout(): while not closing: try: task = asyncio.ensure_future(q.get()) await asyncio.wait_for(task, 0.2) except asyncio.TimeoutError: pass def mem_profiling(): if not closing: types_ = muppy.filter(muppy.get_objects(), Type=dict) summary.print_(summary.summarize(types_)) loop.call_later(15, mem_profiling) def put(): q.put_nowait(None) loop.call_later(60, put) put() tasks = [asyncio.ensure_future(get_with_timeout()) for _ in range(10000)] mem_profiling() try: loop.run_forever() except KeyboardInterrupt: closing = True loop.run_until_complete( asyncio.ensure_future(asyncio.wait(tasks))) finally: loop.close()