This issue tracker has been migrated to GitHub, and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Add asyncio.BufferedProtocol
Type: enhancement Stage: resolved
Components: asyncio Versions: Python 3.7
process
Status: closed Resolution: fixed
Dependencies: Superseder:
Assigned To: yselivanov Nosy List: asvetlov, lukasz.langa, methane, pitrou, yselivanov
Priority: normal Keywords: patch

Created on 2017-12-08 02:04 by yselivanov, last changed 2022-04-11 14:58 by admin. This issue is now closed.

Pull Requests
URL Status Linked Edit
PR 4755 merged yselivanov, 2017-12-08 02:25
PR 5408 merged yselivanov, 2018-01-29 04:35
Messages (16)
msg307830 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-08 02:10
A couple emails from async-sig for the context:

1. https://mail.python.org/pipermail/async-sig/2017-October/000392.html
2. https://mail.python.org/pipermail/async-sig/2017-December/000423.html

I propose to add another Protocol base class to asyncio: BufferedProtocol.  It will have 'get_buffer()' and 'buffer_updated(nbytes)' methods instead of 'data_received()':

    class asyncio.BufferedProtocol:

        def get_buffer(self) -> memoryview:
            pass

        def buffer_updated(self, nbytes: int):
            pass

When the protocol's transport is ready to receive data, it will call `protocol.get_buffer()`.  The latter must return an object that implements the buffer protocol.  The transport will request a writable buffer over the returned object and receive data *into* that buffer.

When the `sock.recv_into(buffer)` call is done, `protocol.buffer_updated(nbytes)` method will be called.  The number of bytes received into the buffer will be passed as a first argument.

I've implemented the proposed design in uvloop (branch 'get_buffer', [1]) and adjusted your benchmark [2] to use it.  Here are benchmark results from my machine (macOS):

vanilla asyncio: 120-135 Mb/s
uvloop: 320-330 Mb/s
uvloop/get_buffer: 600-650 Mb/s.


[1] https://github.com/MagicStack/uvloop/tree/get_buffer
[2] https://gist.github.com/1st1/1c606e5b83ef0e9c41faf21564d75ad7
msg307832 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-08 02:48
I've made a PR that implements the change for selector_events.py.

With the change:

vanilla asyncio:            120-135 Mb/s
vanilla asyncio/get_buffer: 220-230 Mb/s
uvloop:                     320-330 Mb/s
uvloop/get_buffer:          600-650 Mb/s

If we decide to go forward with the proposed design, I'll update the PR with support for proactor_events
msg307841 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2017-12-08 09:34
Numbers are great!

Couple questions.

1. What happens if size of read data is greater than pre-allocated buffer?
2. Is flow control logic changed or not? If I understand correctly pause_reading() / resume_reading() continue to work as earlier.
msg307846 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-12-08 10:51
I have another question: what happens if there is a partial read?

For example, let's says I return a 1024-bytes buffer in get_buffer(), but recv_into() receives data in 512 chunks.  Is it:

1. getbuffer() is called, returns 1024 bytes buffer
2. recv_into() receives 512 bytes, writes them in buf[0:512]
3. recv_into() receives another 512 bytes, writes them in buf[512:1024]

or is it:

1. getbuffer() is called, returns 1024 bytes buffer
2. recv_into() receives 512 bytes, writes them in buf[0:512]
3. getbuffer() is called, returns another 1024 bytes buffer
4. recv_into() receives 512 bytes, writes them in newbuf[0:512]
msg307856 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-08 17:26
> 1. What happens if size of read data is greater than pre-allocated buffer?

Let's say we have 2Kb of data in the socket's network buffer, and we only preallocated 0.5Kb in our buffer.  We will the receive 0.5Kb in our buffer, 'Protocol.buffer_updated()' will be called with nbytes=512, and 1.5Kb of data will be left in the network buffer.  So the loop will call get_buffer()/buffer_updated() again, and the cycle will continue until there's no data left.


> 2. Is flow control logic changed or not? If I understand correctly pause_reading() / resume_reading() continue to work as earlier.

Flow control will continue working absolutely the same for BufferedProtocols.


> I have another question: what happens if there is a partial read?
> For example, let's says I return a 1024-bytes buffer in get_buffer(), but recv_into() receives data in 512 chunks.  Is it:

It will be as follows:

1. Protocol.get_buffer() is called, returns 1024 bytes buffer
2. recv_into() receives 512 bytes, writes them in buf[0:512]
3. Protocol.buffer_updated() is called with nbytes=512

Now it's the responsibility of the Protocol to return a correct view over buffer the next time `get_buffer()` is called.

The general idea is to:

1. allocate a big buffer

2. keep track of how much data we have in that buffer, let's say we have a 'length' integer for that.

3. when get_buffer() is called, return 'memoryview(big_buffer)[length:]'

4. when buffer_updated(nbytes) is called, do 'length += nbytes; parse_buffer_if_possible()'

I've implemented precisely this approach here: https://gist.github.com/1st1/1c606e5b83ef0e9c41faf21564d75ad7#file-get_buffer_bench-py-L27-L43
msg307857 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-12-08 17:31
Do you think it's possible to fold BufferedProtocol into Protocol?
i.e., either `get_buffer()` returns None (the default) and `data_received()` is called with a bytes object, or `get_buffer()` returns a writable buffer and `buffer_updated()` is called with the number of bytes received into the buffer.

This would allow StreamReader to implement readinto().
msg307859 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-08 17:34
> Do you think it's possible to fold BufferedProtocol into Protocol?

It would be a backwards incompatible change :(

Coincidentally, there might be protocols that already implement 'get_buffer()' and 'buffer_updated()' methods that do something completely different.


> This would allow StreamReader to implement readinto().

We can easily refactor StreamReader to use 'BufferedProtocol'.  Methods like 'readexactly()' would benefit from that.
msg307861 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2017-12-08 17:57
New protocol may speed up not only `reader.readexactly()`.
Every `reader.feed_data()` appends a new chunk to existing buffer.
We can try to read into unused tail of the buffer instead.
msg307862 - (view) Author: Łukasz Langa (lukasz.langa) * (Python committer) Date: 2017-12-08 18:23
+1 on the idea, I would use this.
msg308019 - (view) Author: Inada Naoki (methane) * (Python committer) Date: 2017-12-11 08:19
Looks nice.  Can it speed up aiohttp too?
msg308020 - (view) Author: Andrew Svetlov (asvetlov) * (Python committer) Date: 2017-12-11 08:46
Yes.
aiohttp uses own streams but public API and internal implementation are pretty close to asyncio streams.
Moreover C accelerated HTTP parser should work with proposed BufferedProtocol seamlessly.
msg308027 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2017-12-11 10:47
See https://eklitzke.org/goroutines-nonblocking-io-and-memory-usage for an interesting discussion of the drawbacks of some buffer handling idioms.
msg308059 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-11 16:46
> See https://eklitzke.org/goroutines-nonblocking-io-and-memory-usage for an interesting discussion of the drawbacks of some buffer handling idioms.

Thanks for the link!

It does make sense to use a pool of buffers for the proposed BufferedProtocol when you need to keep thousands of long-open connections.  The current design makes that easy: when BufferedProtocol.get_buffer() is called you either take a buffer from the pool or allocate a temporary new one.

For use-cases like DB connections (asyncpg) a permanently allocated buffer per protocol instance is a good solution too, as usually there's a fairly limited number of open DB connections.
msg308060 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2017-12-11 16:50
>> Looks nice.  Can it speed up aiohttp too?

> Yes.
> aiohttp uses own streams but public API and internal implementation are pretty close to asyncio streams.
> Moreover C accelerated HTTP parser should work with proposed BufferedProtocol seamlessly.

I did some benchmarks, and it looks like BufferedProtocol can make httptools up to 5% faster for relatively small requests < 10Kb.  Don't expect big speedups there.

For asyncpg, benchmarks that fetch a lot of data (50-100Kb) get faster up to 15%.  So we'll definitely use the BufferedProtocol in asyncpg.

For applications that need to handle megabytes of data per request (like Antoine's benchmark) the speedup will be up to 2x.
msg311002 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2018-01-28 21:30
New changeset 631fd38dbf04dbf0127881f3977982e401a849e4 by Yury Selivanov in branch 'master':
bpo-32251: Implement asyncio.BufferedProtocol. (#4755)
https://github.com/python/cpython/commit/631fd38dbf04dbf0127881f3977982e401a849e4
msg311055 - (view) Author: Yury Selivanov (yselivanov) * (Python committer) Date: 2018-01-29 04:51
New changeset 07627e9a6a5f418354ff3dc99a0f36bc5b79dcd8 by Yury Selivanov in branch 'master':
bpo-32251: Fix docs (#5408)
https://github.com/python/cpython/commit/07627e9a6a5f418354ff3dc99a0f36bc5b79dcd8
History
Date User Action Args
2022-04-11 14:58:55adminsetgithub: 76432
2018-10-12 21:53:18cheryl.sabellalinkissue32052 dependencies
2018-01-29 04:51:11yselivanovsetmessages: + msg311055
2018-01-29 04:35:34yselivanovsetpull_requests: + pull_request5241
2018-01-28 21:43:53yselivanovsetstatus: open -> closed
type: enhancement
resolution: fixed
stage: patch review -> resolved
2018-01-28 21:30:29yselivanovsetmessages: + msg311002
2017-12-11 16:50:18yselivanovsetmessages: + msg308060
2017-12-11 16:46:57yselivanovsetmessages: + msg308059
2017-12-11 10:47:14pitrousetmessages: + msg308027
2017-12-11 08:46:34asvetlovsetmessages: + msg308020
2017-12-11 08:19:01methanesetnosy: + methane
messages: + msg308019
2017-12-08 18:23:45lukasz.langasetnosy: + lukasz.langa
messages: + msg307862
2017-12-08 18:05:20gvanrossumsetnosy: - gvanrossum
2017-12-08 17:57:44asvetlovsetmessages: + msg307861
2017-12-08 17:34:45yselivanovsetmessages: + msg307859
2017-12-08 17:31:28pitrousetmessages: + msg307857
2017-12-08 17:26:04yselivanovsetmessages: + msg307856
2017-12-08 10:51:32pitrousetmessages: + msg307846
2017-12-08 09:34:26asvetlovsetmessages: + msg307841
2017-12-08 02:48:22yselivanovsetmessages: + msg307832
2017-12-08 02:25:23yselivanovsetkeywords: + patch
stage: patch review
pull_requests: + pull_request4658
2017-12-08 02:10:39yselivanovsetmessages: + msg307830
2017-12-08 02:04:18yselivanovcreate