This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: non awaited coroutines on a IsolatedAsyncioTestCase results on a RuntimeWarning
Type: behavior Stage: resolved
Components: asyncio, Tests Versions: Python 3.10
process
Status: closed Resolution: not a bug
Dependencies: Superseder:
Assigned To: Nosy List: asvetlov, bluecarrot, yselivanov
Priority: normal Keywords:

Created on 2022-01-29 08:45 by bluecarrot, last changed 2022-04-11 14:59 by admin. This issue is now closed.

Messages (7)
msg412060 - (view) Author: (bluecarrot) Date: 2022-01-29 08:45
I am unittesting a tcp proxy module using coroutines. This is the coroutine I am using to do the forwarding, allowing the writer stream to drain while the rest of the coroutines are proceeding:

async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str):
    writer_drain = writer.drain()
    while not event.is_set():
        try:
            data = await asyncio.wait_for(reader.read(1024), 1)
        except asyncio.TimeoutError:
            continue

        if not data:
            event.set()
            break

        # parse the data
        if reading := parse(data):
            # wait for the previous write to finish, and forward the data to the other end, process the data in between
            await writer_drain
            writer.write(data)
        writer_drain = writer.drain()

    # wait for any outstanding write buffer to be flushed
    await writer_drain
    logger.info("{} reader forwarder finished.".format(source))

In my unit tests, I have the following (EnergyAgentProxy is the wrapper calling the coroutine in the module that creates the proxy)

class TestConnections(IsolatedAsyncioTestCase):
    async def asyncSetUp(self) -> None:
        self.proxy = asyncio.create_task(EnergyAgentProxy(self.proxy_port, self.server_port, self.upstream_port))

The problem is: When running these tests, I am getting the following error:
     /usr/lib/python3.10/unittest/async_case.py:159: RuntimeWarning: coroutine 'StreamWriter.drain' was never awaited
     Coroutine created at (most recent call last)
       File "/usr/lib/python3.10/unittest/case.py", line 650, in __call__
         return self.run(*args, **kwds)
       [...]
       File "/home/frubio/Documents/powermonitor_raspberrypi/EnergyAgent.py", line 48, in forward_stream
         writer_drain = writer.drain()
       self._tearDownAsyncioLoop()

So... to me, it looks like when the tasks are being cancelled I am getting this warning because the last "await writer_drain" in forward stream is not executed, but I cannot ensure that. Am I doing something wrong? Is there any way I can just prevent this warning from showing up in my tests?
msg412061 - (view) Author: (bluecarrot) Date: 2022-01-29 08:53
Seems that, should I add an "await asyncio.sleep(1)" in asyncTearDown, so getting

class TestConnections(IsolatedAsyncioTestCase):
    async def asyncSetUp(self) -> None:
        self.proxy = asyncio.create_task(EnergyAgentProxy(self.proxy_port, self.server_port, self.upstream_port))

    async def asyncTearDown(self) -> None:
        await asyncio.sleep(1)

is enough to "hide the problem under the carpet"... but sounds weird...
msg412062 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2022-01-29 09:01
Your code has at least one concurrency problem. Let's look back at forward_stream() function:

async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str):
    writer_drain = writer.drain()  # <--- awaitable is created here
    while not event.is_set():
        try:
            data = await asyncio.wait_for(reader.read(1024), 1)  # <-- CancelledError can be caught here, stack unwinds and writer_drain is never awaited, sure.
        except asyncio.TimeoutError:
            continue
     ...  # the rest is not important for this case

To solve the problem, you should create writer_drain *before its awaiting*, not before another 'await' call.
msg412064 - (view) Author: (bluecarrot) Date: 2022-01-29 09:19
Hi Andrew, thank you for your answer. I am experimenting with coroutines, as I am pretty new to them. My idea was to let the writer drain while other packets where read, and thus I am waiting for the writer_drain right before starting writer.write again. Isn't that the correct wait to overlap the readings and the writings?

If I modify my initial code to look like:

async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str):
    writer_drain = writer.drain()  # <--- awaitable is created here
    while not event.is_set():
        try:
            data = await asyncio.wait_for(reader.read(1024), 1)  # <-- CancelledError can be caught here, stack unwinds and writer_drain is never awaited, sure.
        except asyncio.TimeoutError:
            continue
        except asyncio.CancelledError:
            event.set()
            break
     ...  # the rest is not important for this case

    await writer_drain

so that in case the task is cancelled, writer_drain will be awaited outside of the loop. This works, at the cost of having to introduce code specific for testing purposes (which feels wrong). In "production", the workflow of this code will be to loose the connection, break out of the loop, and wait for the writer stream to finish... but I am not introducing any method allowing me to cancel the streams once the script is running.

In the same way leaked tasks are "swallowed", which I have tested and works, shouldn't be these cases also handled by the tearDownClass method of IsolatedAsyncioTestCase?
msg412066 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2022-01-29 09:38
Your version works but can be simplified.

Just use

    await writer.drain()  
    writer.write(data)

without grabbing the drainer early.
The purpose of the .drain() method is to write pausing if the write buffer side is greater than the high watermark. 
The 'await writer.drain()' waits until the buffer size became less than low watermark. It prevents uncontrollable write buffer growth if a peer cannot accept TCP message as fast as `writer.write()` sends them.
The .drain() call has no hidden process under the hood, there is no need to get write_drain reference as early as possible. It is just 'waiting for a flag'.
Also, there is no need for `await write_drain` at the end: asyncio transport sends all data from the internal write buffer before closing (and doesn't do it on 'transport.abort()').
msg412068 - (view) Author: (bluecarrot) Date: 2022-01-29 10:14
You are absolutely correct. Thank you very much!
msg412081 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2022-01-29 16:08
You are welcome!
History
Date User Action Args
2022-04-11 14:59:55adminsetgithub: 90726
2022-01-29 16:08:45asvetlovsetmessages: + msg412081
2022-01-29 15:43:20asvetlovsetstatus: open -> closed
resolution: not a bug
stage: resolved
2022-01-29 10:14:06bluecarrotsetmessages: + msg412068
2022-01-29 09:38:33asvetlovsetmessages: + msg412066
2022-01-29 09:19:55bluecarrotsetmessages: + msg412064
2022-01-29 09:01:39asvetlovsetmessages: + msg412062
2022-01-29 08:53:25bluecarrotsetmessages: + msg412061
2022-01-29 08:45:57bluecarrotcreate