classification
Title: Waiting for asyncio.StreamWriter.drain() twice in parallel raises an AssertionError when the transport stopped writing
Type: behavior Stage:
Components: asyncio Versions: Python 3.6
process
Status: open Resolution:
Dependencies: Superseder:
Assigned To: Nosy List: RemiCardona, aymeric.augustin, callumquick, chris.jerdonek, jnwatson, metathink, socketpair, vstinner, yselivanov
Priority: normal Keywords: patch

Created on 2017-03-28 09:45 by metathink, last changed 2019-11-04 15:20 by callumquick.

Files
File name Uploaded Description Edit
server.py vstinner, 2017-03-28 12:35
client.py vstinner, 2017-03-28 12:35
drain_multiple_waiters.patch vstinner, 2017-03-28 12:57 review
Messages (11)
msg290690 - (view) Author: Metathink (metathink) Date: 2017-03-28 09:45
While trying to break some code in a project using asyncio, I found that under certain circumstances, asyncio.StreamWriter.drain raises an AssertionError.

1. There must be a lot of concurrent uses of "await writer.drain()"
2. The server on which we send data must be public, no AssertionError occurs while connected to 127.0.0.1

Task exception was never retrieved
future: <Task finished coro=<flooding() done, defined at client.py:10> exception=AssertionError()>
Traceback (most recent call last):
  File "client.py", line 12, in flooding
    await writer.drain()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 333, in drain
    yield from self._protocol._drain_helper()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 208, in _drain_helper
    assert waiter is None or waiter.cancelled()
AssertionError

I don't know much about how the drain function is working or how networking is handled by the OS, but I'm assuming that I'v reached some OS limitation which trigger this AssertionError.

I'm not sure how I'm supposed to handle that. Am I supposed to add some throttling because I should not send too much data concurrently? Is this considered as a bug? Any explanations are welcome.

Here some minimal client and server exemples if you want to try to reproduce it:
- Server: https://pastebin.com/SED89pwB
- Client: https://pastebin.com/ikJKHxi9

Also, I don't think this is limited to python 3.6, I'v found this old issue on the aaugustin's websockets repo which looks the same: https://github.com/aaugustin/websockets/issues/16
msg290702 - (view) Author: STINNER Victor (vstinner) * (Python committer) Date: 2017-03-28 12:35
Modified client and server to be able to reproduce the issue on a LAN.
msg290703 - (view) Author: STINNER Victor (vstinner) * (Python committer) Date: 2017-03-28 12:36
The bug occurs when the transport pauses writing and the client code calls drain() multiple times in parallel.
msg290704 - (view) Author: STINNER Victor (vstinner) * (Python committer) Date: 2017-03-28 12:47
I understood that:

* code fills the write buffer of the transport until writing is paused because of the high water mark
* a function calls drain() which waits until the server reads until packets to reduce the size of the write buffer
* a second function calls drain(), but the first function is already waiting on drain(): bug occurs since the code doesn't support having two coroutines waiting on drain() in parallel

Notes:

* the minimum is to document that drain() must not be called twice in parallel. Right now, nothing is said about that:
https://docs.python.org/dev/library/asyncio-stream.html#asyncio.StreamWriter.drain

* we can probably design something to allow to have multiple coroutines waiting on the same event

--

Metathink told me that he got the bug on a much more complex code using websockets. Thank you Metathink for isolating the bug to a few lines of Python code with simpler asyncio functions!
msg290706 - (view) Author: STINNER Victor (vstinner) * (Python committer) Date: 2017-03-28 12:57
Proof-of-concept of patched drain() to support multiple waiters.

I don't see any strong reason to not allow two coroutines to wait on drain() in parallel?

I'm too lazy to write a full patch with unit tests, doc changed, etc. I started with a PoC to discuss the solution.
msg290732 - (view) Author: Aymeric Augustin (aymeric.augustin) * Date: 2017-03-28 15:31
For context, websockets calls `yield from self.writer.drain()` after each write in order to provide backpressure.

If the output buffer fills up, calling API coroutines that write to the websocket connection becomes slow and hopefully the backpressure will propagate (in a correctly implemented application).

This is a straightforward application of the only use case described in the documentation.

----

I would find it annoying to have to serialize calls to drain() myself. It doesn't feel like something the "application" should care about. (websockets is the application from asyncio's perspective.)

I'm wondering if it could be a problem if a bunch of corountines were waiting on drain() and got released simultaneously. I don't think it would be a problem for websockets. Since my use case seems typical, there's a good chance this also applies to other apps.

So I'm in favor of simply allowing an arbitrary number of coroutines to wait on drain() in parallel, if that's feasible.
msg290760 - (view) Author: Rémi Cardona (RemiCardona) * Date: 2017-03-28 22:58
Got hit by this too, maaaaany times as I'm working with 3G devices (slow write speeds).

As for "drain()", I'd say it should work like a fence/barrier: to let you know that the bytes in the buffer up to when the call was issued have been successfully written on the socket.

I'll see what I can cook up.

Cheers
msg290773 - (view) Author: Aymeric Augustin (aymeric.augustin) * Date: 2017-03-29 07:41
drain() returns when the write buffer reaches the low water mark, not when it's empty, so you don't have a guarantee that your bytes were written to the socket.

https://github.com/python/cpython/blob/6f0eb93183519024cb360162bdd81b9faec97ba6/Lib/asyncio/protocols.py#L36-L40

The low water mark defaults to 64kB and the high water mark to 256kB.

https://github.com/python/cpython/blob/6f0eb93183519024cb360162bdd81b9faec97ba6/Lib/asyncio/transports.py#L290

With websockets, the recommended way to ensure your message was received is:

yield from ws.send(...)
yield from ws.ping()

Given that TCP guarantees ordering, the ping response can only be received after the previous message was fully sent and received.

Of course the ping can fail even though the message was received, that's the classical at-most-once vs. at-least-once question.

The technique you suggest requires setting the low and high water marks to 0. I'm not sure this is the best way to achieve your goals, since you still don't control the OS buffers.
msg298981 - (view) Author: Марк Коренберг (socketpair) * Date: 2017-07-24 15:11
Triggered almost the same error. Minimal proof:

Documentation did not say that .drain() can't be called simultaneously.

===================================
async def do_nothing(client_reader, client_writer):
    await asyncio.sleep(10000)

mb = b'*' * (4096*4)
async def write_cmd(writer):
    writer.write(mb)
    await writer.drain()

async def amain():
    srv = await asyncio.start_unix_server(do_nothing, b'\x00qwe')
    (reader, writer) = await asyncio.open_unix_connection(b'\x00qwe')
    await asyncio.gather(*(write_cmd(writer) for i in range(200)))

loop = asyncio.get_event_loop()
loop.run_until_complete(amain())
===================================
msg301767 - (view) Author: Aymeric Augustin (aymeric.augustin) * Date: 2017-09-09 12:19
I worked around this bug in websockets by serializing access to `drain()` with a lock:

https://github.com/aaugustin/websockets/commit/198b71537917adb44002573b14cbe23dbd4c21a2

I suspect this is inefficient but it beats crashing.
msg355951 - (view) Author: Callum Ward (callumquick) * Date: 2019-11-04 15:20
Hi, I'm a new contributor: is there any consensus on what or if something needs to be done? If so, I can try and take forward the patch.
History
Date User Action Args
2019-11-04 15:20:53callumquicksetnosy: + callumquick
messages: + msg355951
2019-04-16 01:31:07jnwatsonsetnosy: + jnwatson
2017-09-09 12:19:16aymeric.augustinsetmessages: + msg301767
2017-07-24 15:11:01socketpairsetnosy: + socketpair
messages: + msg298981
2017-07-10 09:14:47chris.jerdoneksetnosy: + chris.jerdonek
2017-03-29 07:41:15aymeric.augustinsetmessages: + msg290773
2017-03-28 22:58:11RemiCardonasetnosy: + RemiCardona
messages: + msg290760
2017-03-28 15:31:11aymeric.augustinsetnosy: + aymeric.augustin
messages: + msg290732
2017-03-28 13:12:14vstinnersettitle: asyncio.StreamWriter.drain raises an AssertionError under heavy use -> Waiting for asyncio.StreamWriter.drain() twice in parallel raises an AssertionError when the transport stopped writing
2017-03-28 12:57:03vstinnersetfiles: + drain_multiple_waiters.patch
keywords: + patch
messages: + msg290706
2017-03-28 12:47:06vstinnersetmessages: + msg290704
2017-03-28 12:36:03vstinnersetmessages: + msg290703
2017-03-28 12:35:35vstinnersetfiles: + client.py
nosy: + vstinner
messages: + msg290702

2017-03-28 12:35:08vstinnersetfiles: + server.py
2017-03-28 09:45:54metathinkcreate