From 77a59d20fb5f561e5ff9bf8c099a0219963e6a08 Mon Sep 17 00:00:00 2001 From: Kevin Conway Date: Wed, 31 Aug 2016 03:58:06 +0000 Subject: [PATCH] Accept multiple connections on listener EVENT_READ When a large number of connections are made at once the event loop struggles to accept them because it only accepts and begins handling one new connection on each pass of the loop in which the EVENT_READ is emitted for the listening socket. This causes incoming connections to time out and sever. This patch uses the backlog setting of the listening socket to allow for accepting a bounded number of new incoming connections rather than a single one. --- Lib/asyncio/base_events.py | 2 +- Lib/asyncio/selector_events.py | 73 +++++++++++++++------------ Lib/test/test_asyncio/test_base_events.py | 2 +- Lib/test/test_asyncio/test_selector_events.py | 14 +++++ 4 files changed, 56 insertions(+), 35 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 0916da8..0bcbfcb 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -976,7 +976,7 @@ class BaseEventLoop(events.AbstractEventLoop): for sock in sockets: sock.listen(backlog) sock.setblocking(False) - self._start_serving(protocol_factory, sock, ssl, server) + self._start_serving(protocol_factory, sock, ssl, server, backlog) if self._debug: logger.info("%r is serving", server) return server diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 34cce6b..073dc5a 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -151,43 +151,50 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): exc_info=True) def _start_serving(self, protocol_factory, sock, - sslcontext=None, server=None): + sslcontext=None, server=None, backlog=100): self.add_reader(sock.fileno(), self._accept_connection, - protocol_factory, sock, sslcontext, server) + protocol_factory, sock, sslcontext, server, backlog) def _accept_connection(self, protocol_factory, sock, - sslcontext=None, server=None): - try: - conn, addr = sock.accept() - if self._debug: - logger.debug("%r got a new connection from %r: %r", - server, addr, conn) - conn.setblocking(False) - except (BlockingIOError, InterruptedError, ConnectionAbortedError): - pass # False alarm. - except OSError as exc: - # There's nowhere to send the error, so just log it. - if exc.errno in (errno.EMFILE, errno.ENFILE, - errno.ENOBUFS, errno.ENOMEM): - # Some platforms (e.g. Linux keep reporting the FD as - # ready, so we remove the read handler temporarily. - # We'll try again in a while. - self.call_exception_handler({ - 'message': 'socket.accept() out of system resource', - 'exception': exc, - 'socket': sock, - }) - self.remove_reader(sock.fileno()) - self.call_later(constants.ACCEPT_RETRY_DELAY, - self._start_serving, - protocol_factory, sock, sslcontext, server) + sslcontext=None, server=None, backlog=100): + # This method is only called once for each event loop tick where the + # listening socket has triggered an EVENT_READ. There may be multiple + # connections waiting for an .accept() so it is called in a loop. + # See https://bugs.python.org/issue27906 for more details. + for _ in range(backlog): + try: + conn, addr = sock.accept() + if self._debug: + logger.debug("%r got a new connection from %r: %r", + server, addr, conn) + conn.setblocking(False) + except (BlockingIOError, InterruptedError, ConnectionAbortedError): + # Early exit because the socket accept buffer is empty. + return None + except OSError as exc: + # There's nowhere to send the error, so just log it. + if exc.errno in (errno.EMFILE, errno.ENFILE, + errno.ENOBUFS, errno.ENOMEM): + # Some platforms (e.g. Linux keep reporting the FD as + # ready, so we remove the read handler temporarily. + # We'll try again in a while. + self.call_exception_handler({ + 'message': 'socket.accept() out of system resource', + 'exception': exc, + 'socket': sock, + }) + self.remove_reader(sock.fileno()) + self.call_later(constants.ACCEPT_RETRY_DELAY, + self._start_serving, + protocol_factory, sock, sslcontext, server, + backlog) + else: + raise # The event loop will catch, log and ignore it. else: - raise # The event loop will catch, log and ignore it. - else: - extra = {'peername': addr} - accept = self._accept_connection2(protocol_factory, conn, extra, - sslcontext, server) - self.create_task(accept) + extra = {'peername': addr} + accept = self._accept_connection2(protocol_factory, conn, extra, + sslcontext, server) + self.create_task(accept) @coroutine def _accept_connection2(self, protocol_factory, conn, extra, diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 206ebc6..0efdc20 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1634,7 +1634,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY, # self.loop._start_serving mock.ANY, - MyProto, sock, None, None) + MyProto, sock, None, None, mock.ANY) def test_call_coroutine(self): @asyncio.coroutine diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index ff71c21..73bc3f3 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -687,6 +687,20 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): selectors.EVENT_WRITE)]) self.loop.remove_writer.assert_called_with(1) + def test_accept_connection_multiple(self): + sock = mock.Mock() + sock.accept.return_value = (mock.Mock(), mock.Mock()) + backlog = 100 + # Mock the coroutine generation for a connection to prevent + # warnings related to un-awaited coroutines. + mock_obj = mock.patch.object + with mock_obj(self.loop, '_accept_connection2') as accept2_mock: + accept2_mock.return_value = None + with mock_obj(self.loop, 'create_task') as task_mock: + task_mock.return_value = None + self.loop._accept_connection(mock.Mock(), sock, backlog=backlog) + self.assertEqual(sock.accept.call_count, backlog) + class SelectorTransportTests(test_utils.TestCase): -- 2.1.4