Issue46568
This issue tracker has been migrated to GitHub,
and is currently read-only.
For more information,
see the GitHub FAQs in the Python's Developer Guide.
Created on 2022-01-29 08:45 by bluecarrot, last changed 2022-04-11 14:59 by admin. This issue is now closed.
Messages (7) | |||
---|---|---|---|
msg412060 - (view) | Author: (bluecarrot) | Date: 2022-01-29 08:45 | |
I am unittesting a tcp proxy module using coroutines. This is the coroutine I am using to do the forwarding, allowing the writer stream to drain while the rest of the coroutines are proceeding: async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str): writer_drain = writer.drain() while not event.is_set(): try: data = await asyncio.wait_for(reader.read(1024), 1) except asyncio.TimeoutError: continue if not data: event.set() break # parse the data if reading := parse(data): # wait for the previous write to finish, and forward the data to the other end, process the data in between await writer_drain writer.write(data) writer_drain = writer.drain() # wait for any outstanding write buffer to be flushed await writer_drain logger.info("{} reader forwarder finished.".format(source)) In my unit tests, I have the following (EnergyAgentProxy is the wrapper calling the coroutine in the module that creates the proxy) class TestConnections(IsolatedAsyncioTestCase): async def asyncSetUp(self) -> None: self.proxy = asyncio.create_task(EnergyAgentProxy(self.proxy_port, self.server_port, self.upstream_port)) The problem is: When running these tests, I am getting the following error: /usr/lib/python3.10/unittest/async_case.py:159: RuntimeWarning: coroutine 'StreamWriter.drain' was never awaited Coroutine created at (most recent call last) File "/usr/lib/python3.10/unittest/case.py", line 650, in __call__ return self.run(*args, **kwds) [...] File "/home/frubio/Documents/powermonitor_raspberrypi/EnergyAgent.py", line 48, in forward_stream writer_drain = writer.drain() self._tearDownAsyncioLoop() So... to me, it looks like when the tasks are being cancelled I am getting this warning because the last "await writer_drain" in forward stream is not executed, but I cannot ensure that. Am I doing something wrong? Is there any way I can just prevent this warning from showing up in my tests? |
|||
msg412061 - (view) | Author: (bluecarrot) | Date: 2022-01-29 08:53 | |
Seems that, should I add an "await asyncio.sleep(1)" in asyncTearDown, so getting class TestConnections(IsolatedAsyncioTestCase): async def asyncSetUp(self) -> None: self.proxy = asyncio.create_task(EnergyAgentProxy(self.proxy_port, self.server_port, self.upstream_port)) async def asyncTearDown(self) -> None: await asyncio.sleep(1) is enough to "hide the problem under the carpet"... but sounds weird... |
|||
msg412062 - (view) | Author: Andrew Svetlov (asvetlov) * ![]() |
Date: 2022-01-29 09:01 | |
Your code has at least one concurrency problem. Let's look back at forward_stream() function: async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str): writer_drain = writer.drain() # <--- awaitable is created here while not event.is_set(): try: data = await asyncio.wait_for(reader.read(1024), 1) # <-- CancelledError can be caught here, stack unwinds and writer_drain is never awaited, sure. except asyncio.TimeoutError: continue ... # the rest is not important for this case To solve the problem, you should create writer_drain *before its awaiting*, not before another 'await' call. |
|||
msg412064 - (view) | Author: (bluecarrot) | Date: 2022-01-29 09:19 | |
Hi Andrew, thank you for your answer. I am experimenting with coroutines, as I am pretty new to them. My idea was to let the writer drain while other packets where read, and thus I am waiting for the writer_drain right before starting writer.write again. Isn't that the correct wait to overlap the readings and the writings? If I modify my initial code to look like: async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str): writer_drain = writer.drain() # <--- awaitable is created here while not event.is_set(): try: data = await asyncio.wait_for(reader.read(1024), 1) # <-- CancelledError can be caught here, stack unwinds and writer_drain is never awaited, sure. except asyncio.TimeoutError: continue except asyncio.CancelledError: event.set() break ... # the rest is not important for this case await writer_drain so that in case the task is cancelled, writer_drain will be awaited outside of the loop. This works, at the cost of having to introduce code specific for testing purposes (which feels wrong). In "production", the workflow of this code will be to loose the connection, break out of the loop, and wait for the writer stream to finish... but I am not introducing any method allowing me to cancel the streams once the script is running. In the same way leaked tasks are "swallowed", which I have tested and works, shouldn't be these cases also handled by the tearDownClass method of IsolatedAsyncioTestCase? |
|||
msg412066 - (view) | Author: Andrew Svetlov (asvetlov) * ![]() |
Date: 2022-01-29 09:38 | |
Your version works but can be simplified. Just use await writer.drain() writer.write(data) without grabbing the drainer early. The purpose of the .drain() method is to write pausing if the write buffer side is greater than the high watermark. The 'await writer.drain()' waits until the buffer size became less than low watermark. It prevents uncontrollable write buffer growth if a peer cannot accept TCP message as fast as `writer.write()` sends them. The .drain() call has no hidden process under the hood, there is no need to get write_drain reference as early as possible. It is just 'waiting for a flag'. Also, there is no need for `await write_drain` at the end: asyncio transport sends all data from the internal write buffer before closing (and doesn't do it on 'transport.abort()'). |
|||
msg412068 - (view) | Author: (bluecarrot) | Date: 2022-01-29 10:14 | |
You are absolutely correct. Thank you very much! |
|||
msg412081 - (view) | Author: Andrew Svetlov (asvetlov) * ![]() |
Date: 2022-01-29 16:08 | |
You are welcome! |
History | |||
---|---|---|---|
Date | User | Action | Args |
2022-04-11 14:59:55 | admin | set | github: 90726 |
2022-01-29 16:08:45 | asvetlov | set | messages: + msg412081 |
2022-01-29 15:43:20 | asvetlov | set | status: open -> closed resolution: not a bug stage: resolved |
2022-01-29 10:14:06 | bluecarrot | set | messages: + msg412068 |
2022-01-29 09:38:33 | asvetlov | set | messages: + msg412066 |
2022-01-29 09:19:55 | bluecarrot | set | messages: + msg412064 |
2022-01-29 09:01:39 | asvetlov | set | messages: + msg412062 |
2022-01-29 08:53:25 | bluecarrot | set | messages: + msg412061 |
2022-01-29 08:45:57 | bluecarrot | create |