From 4f833ea8ca1d4b574668de45d22364e674f2a173 Mon Sep 17 00:00:00 2001 From: Kevin Conway Date: Wed, 31 Aug 2016 03:58:06 +0000 Subject: [PATCH] Accept multiple connections on listener EVENT_READ When a large number of connections are made at once the event loop struggles to accept them because it only accepts and begins handling one new connection on each pass of the loop in which the EVENT_READ is emitted for the listening socket. This causes incoming connections to time out and sever. This patch uses the backlog setting of the listening socket to allow for accepting a bounded number of new incoming connections rather than a single one. --- Lib/asyncio/base_events.py | 2 +- Lib/asyncio/selector_events.py | 69 ++++++++++++++------------- Lib/test/test_asyncio/test_base_events.py | 2 +- Lib/test/test_asyncio/test_selector_events.py | 14 ++++++ 4 files changed, 52 insertions(+), 35 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 0916da8..0bcbfcb 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -976,7 +976,7 @@ class BaseEventLoop(events.AbstractEventLoop): for sock in sockets: sock.listen(backlog) sock.setblocking(False) - self._start_serving(protocol_factory, sock, ssl, server) + self._start_serving(protocol_factory, sock, ssl, server, backlog) if self._debug: logger.info("%r is serving", server) return server diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 34cce6b..bcc3aa1 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -151,43 +151,46 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): exc_info=True) def _start_serving(self, protocol_factory, sock, - sslcontext=None, server=None): + sslcontext=None, server=None, backlog=100): self.add_reader(sock.fileno(), self._accept_connection, - protocol_factory, sock, sslcontext, server) + protocol_factory, sock, sslcontext, server, backlog) def _accept_connection(self, protocol_factory, sock, - sslcontext=None, server=None): - try: - conn, addr = sock.accept() - if self._debug: - logger.debug("%r got a new connection from %r: %r", - server, addr, conn) - conn.setblocking(False) - except (BlockingIOError, InterruptedError, ConnectionAbortedError): - pass # False alarm. - except OSError as exc: - # There's nowhere to send the error, so just log it. - if exc.errno in (errno.EMFILE, errno.ENFILE, - errno.ENOBUFS, errno.ENOMEM): - # Some platforms (e.g. Linux keep reporting the FD as - # ready, so we remove the read handler temporarily. - # We'll try again in a while. - self.call_exception_handler({ - 'message': 'socket.accept() out of system resource', - 'exception': exc, - 'socket': sock, - }) - self.remove_reader(sock.fileno()) - self.call_later(constants.ACCEPT_RETRY_DELAY, - self._start_serving, - protocol_factory, sock, sslcontext, server) + sslcontext=None, server=None, backlog=100): + for _ in range(backlog): + try: + conn, addr = sock.accept() + if self._debug: + logger.debug("%r got a new connection from %r: %r", + server, addr, conn) + conn.setblocking(False) + except (BlockingIOError, InterruptedError, ConnectionAbortedError): + # Early exit because the socket accept buffer is empty. + return None + except OSError as exc: + # There's nowhere to send the error, so just log it. + if exc.errno in (errno.EMFILE, errno.ENFILE, + errno.ENOBUFS, errno.ENOMEM): + # Some platforms (e.g. Linux keep reporting the FD as + # ready, so we remove the read handler temporarily. + # We'll try again in a while. + self.call_exception_handler({ + 'message': 'socket.accept() out of system resource', + 'exception': exc, + 'socket': sock, + }) + self.remove_reader(sock.fileno()) + self.call_later(constants.ACCEPT_RETRY_DELAY, + self._start_serving, + protocol_factory, sock, sslcontext, server, + backlog) + else: + raise # The event loop will catch, log and ignore it. else: - raise # The event loop will catch, log and ignore it. - else: - extra = {'peername': addr} - accept = self._accept_connection2(protocol_factory, conn, extra, - sslcontext, server) - self.create_task(accept) + extra = {'peername': addr} + accept = self._accept_connection2(protocol_factory, conn, extra, + sslcontext, server) + self.create_task(accept) @coroutine def _accept_connection2(self, protocol_factory, conn, extra, diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 206ebc6..0efdc20 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1634,7 +1634,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY, # self.loop._start_serving mock.ANY, - MyProto, sock, None, None) + MyProto, sock, None, None, mock.ANY) def test_call_coroutine(self): @asyncio.coroutine diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index ff71c21..73bc3f3 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -687,6 +687,20 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): selectors.EVENT_WRITE)]) self.loop.remove_writer.assert_called_with(1) + def test_accept_connection_multiple(self): + sock = mock.Mock() + sock.accept.return_value = (mock.Mock(), mock.Mock()) + backlog = 100 + # Mock the coroutine generation for a connection to prevent + # warnings related to un-awaited coroutines. + mock_obj = mock.patch.object + with mock_obj(self.loop, '_accept_connection2') as accept2_mock: + accept2_mock.return_value = None + with mock_obj(self.loop, 'create_task') as task_mock: + task_mock.return_value = None + self.loop._accept_connection(mock.Mock(), sock, backlog=backlog) + self.assertEqual(sock.accept.call_count, backlog) + class SelectorTransportTests(test_utils.TestCase): -- 2.1.4