diff --git a/Lib/asynchat.py b/Lib/asynchat.py --- a/Lib/asynchat.py +++ b/Lib/asynchat.py @@ -194,6 +194,11 @@ self.initiate_send() def handle_close (self): + if not self._closing: + self._closing = True + # try to drain the output buffer + while self.writable() and self.initiate_send() > 0: + pass self.close() def push (self, data): @@ -234,7 +239,7 @@ if first is None: ## print("first is None") self.handle_close() - return + return 0 ## print("first is not None") # handle classic producer behavior @@ -257,7 +262,7 @@ num_sent = self.send(data) except socket.error: self.handle_error() - return + return 0 if num_sent: if num_sent < len(data) or obs < len(first): @@ -265,7 +270,7 @@ else: del self.producer_fifo[0] # we tried to send some actual data - return + return num_sent def discard_buffers (self): # Emergencies only! diff --git a/Lib/asyncore.py b/Lib/asyncore.py --- a/Lib/asyncore.py +++ b/Lib/asyncore.py @@ -221,6 +221,7 @@ connected = False accepting = False closing = False + _closing = False addr = None ignore_log_types = frozenset(['warning']) @@ -527,14 +528,17 @@ class dispatcher_with_send(dispatcher): + out_buffer_size = 512 + def __init__(self, sock=None, map=None): dispatcher.__init__(self, sock, map) self.out_buffer = b'' def initiate_send(self): - num_sent = 0 - num_sent = dispatcher.send(self, self.out_buffer[:512]) + num_sent = dispatcher.send( + self, self.out_buffer[:self.out_buffer_size]) self.out_buffer = self.out_buffer[num_sent:] + return num_sent def handle_write(self): self.initiate_send() @@ -548,6 +552,14 @@ self.out_buffer = self.out_buffer + data self.initiate_send() + def handle_close(self): + if not self._closing: + self._closing = True + # try to drain the output buffer + while self.writable() and self.initiate_send() > 0: + pass + self.close() + # --------------------------------------------------------------------------- # used for debugging. # --------------------------------------------------------------------------- diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py --- a/Lib/test/test_asynchat.py +++ b/Lib/test/test_asynchat.py @@ -15,6 +15,7 @@ HOST = support.HOST SERVER_QUIT = b'QUIT\n' +SERVER_SHUTDOWN = b'SHUTDOWN\n' if threading: class echo_server(threading.Thread): @@ -36,12 +37,18 @@ self.event.set() conn, client = self.sock.accept() self.buffer = b"" + shutdown = False # collect data until quit message is seen while SERVER_QUIT not in self.buffer: data = conn.recv(1) if not data: break self.buffer = self.buffer + data + # perform a half-duplex close + if not shutdown and SERVER_SHUTDOWN in self.buffer: + shutdown = True + self.buffer = self.buffer.replace(SERVER_SHUTDOWN, b'') + conn.shutdown(socket.SHUT_WR) # remove the SERVER_QUIT message self.buffer = self.buffer.replace(SERVER_QUIT, b'') @@ -53,7 +60,7 @@ try: # this may fail on some tests, such as test_close_when_done, since # the client closes the channel when it's done sending - while self.buffer: + while not shutdown and self.buffer: n = conn.send(self.buffer[:self.chunk_size]) time.sleep(0.001) self.buffer = self.buffer[n:] @@ -234,6 +241,25 @@ # (which could still result in the client not having received anything) self.assertGreater(len(s.buffer), 0) + def test_drain_output_after_close(self): + # Check that the output buffer is drained after a half-duplex close + # (issue #12498). + + s, event = start_echo_server() + c = echo_client(b'\n', s.port) + + c.push(SERVER_SHUTDOWN) + count = 4 + chunk_size = asynchat.async_chat.ac_in_buffer_size + data = b' ' * chunk_size + b'\n' + for i in range(count): + c.push(data) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(len(s.buffer), count * len(data)) + class TestAsynchat_WithPoll(TestAsynchat): usepoll = True diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -664,6 +664,49 @@ client = TestClient(self.family, server.address) self.loop_waiting_for_flag(client) + def test_drain_output_after_close(self): + # Check that the output buffer is drained after a half-duplex close + # (issue #12498). + + class TestClient(asyncore.dispatcher_with_send): + + def __init__(self, family, address): + asyncore.dispatcher_with_send.__init__(self) + self.flag = False + self.create_socket(family) + self.connect(address) + + def handle_connect(self): + data = b'\0' * self.out_buffer_size * 4 + self.send(data) + + def initiate_send(self): + num_sent = asyncore.dispatcher_with_send.initiate_send(self) + self.flag = not len(self.out_buffer) + return num_sent + + def handle_read(self): + self.recv(64) + + class TestHandler(BaseTestHandler): + + def __init__(self, sock): + BaseTestHandler.__init__(self, sock) + self.shutdown = False + + def writable(self): + return False + + def handle_read(self): + self.recv(64) + if not self.shutdown: + self.shutdown = True + self.socket.shutdown(socket.SHUT_WR) + + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + @unittest.skipIf(sys.platform.startswith("sunos"), "OOB support is broken on Solaris") def test_handle_expt(self):