Index: Lib/test/test_asyncore.py =================================================================== --- Lib/test/test_asyncore.py (revisione 80372) +++ Lib/test/test_asyncore.py (copia locale) @@ -396,11 +396,277 @@ self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) +class BaseTestHandler(asyncore.dispatcher): + + def __init__(self, sock=None): + asyncore.dispatcher.__init__(self, sock) + self.flag = False + + def handle_accept(self): + raise Exception("handle_accept not supposed to be called") + + def handle_connect(self): + raise Exception("handle_connect not supposed to be called") + + def handle_expt(self): + raise Exception("handle_expt not supposed to be called") + + def handle_close(self): + raise Exception("handle_close not supposed to be called") + + def handle_error(self): + raise + + +class TCPServer(asyncore.dispatcher): + """A server which listens on an address and dispatches the + connection to a handler. + """ + + def __init__(self, handler=BaseTestHandler, host=HOST, port=0): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((host, port)) + self.listen(5) + self.handler = handler + + @property + def address(self): + return self.socket.getsockname()[:2] + + def handle_accept(self): + sock, addr = self.accept() + self.handler(sock) + + def handle_error(self): + raise + + +class BaseClient(BaseTestHandler): + + def __init__(self, address): + BaseTestHandler.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(address) + + def handle_connect(self): + pass + + +class BaseTestAPI(unittest.TestCase): + use_poll = False + + def tearDown(self): + asyncore.close_all() + + def loop_waiting_for_flag(self, instance, timeout=0.01, count=100): + while asyncore.socket_map and count > 0: + asyncore.loop(timeout=timeout, count=1, use_poll=self.use_poll) + if instance.flag: + return + count -= 1 + time.sleep(timeout) + self.fail("flag not set") + + def test_handle_connect(self): + # make sure handle_connect is called on connect() + + class TestClient(BaseClient): + def handle_connect(self): + self.flag = True + + server = TCPServer() + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_accept(self): + # make sure handle_accept() is called when a client connects + + class TestListener(BaseTestHandler): + + def __init__(self): + BaseTestHandler.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.bind((HOST, 0)) + self.listen(5) + self.address = self.socket.getsockname()[:2] + + def handle_accept(self): + self.flag = True + + server = TestListener() + client = BaseClient(server.address) + self.loop_waiting_for_flag(server) + + def test_handle_read(self): + # make sure handle_read is called on data received + + class TestClient(BaseClient): + def handle_read(self): + self.flag = True + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.send('x' * 1024) + + server = TCPServer(TestHandler) + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_write(self): + # make sure handle_write is called + + class TestClient(BaseClient): + def handle_write(self): + self.flag = True + + server = TCPServer() + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_close(self): + # make sure handle_close is called when the other end closes + # the connection + + class TestClient(BaseClient): + + def handle_read(self): + # in order to make handle_close be called we are supposed + # to make at least one recv() call + self.recv(1024) + + def handle_close(self): + self.flag = True + self.close() + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.close() + + server = TCPServer(TestHandler) + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_expt(self): + # Make sure handle_expt is called on OOB data received. + # Note: this might fail on some platforms as OOB data is + # tenuously supported and rarely used. + + class TestClient(BaseClient): + def handle_expt(self): + self.flag = True + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.socket.send(chr(244), socket.MSG_OOB) + + server = TCPServer(TestHandler) + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_error(self): + + class TestClient(BaseClient): + def handle_connect(self): + 1 / 0 + def handle_error(self): + self.flag = True + try: + raise + except ZeroDivisionError: + pass + else: + raise Exception("exception not raised") + + server = TCPServer() + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_connection_attributes(self): + server = TCPServer() + client = BaseClient(server.address) + + # we start disconnected + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + self.assertFalse(client.connected) + self.assertFalse(client.accepting) + + # execute some loops so that client connects to server + asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + self.assertTrue(client.connected) + self.assertFalse(client.accepting) + + # disconnect the client + client.close() + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + self.assertFalse(client.connected) + self.assertFalse(client.accepting) + + # stop serving + server.close() + self.assertFalse(server.connected) + self.assertFalse(server.accepting) + + def test_create_socket(self): + s = asyncore.dispatcher() + s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.assertEqual(s.socket.family, socket.AF_INET) + self.assertEqual(s.socket.type, socket.SOCK_STREAM) + + def test_bind(self): + s1 = asyncore.dispatcher() + s1.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s1.bind((HOST, 0)) + s1.listen(5) + port = s1.socket.getsockname()[1] + + s2 = asyncore.dispatcher() + s2.create_socket(socket.AF_INET, socket.SOCK_STREAM) + # EADDRINUSE indicates the socket was correctly bound + self.assertRaises(socket.error, s2.bind, (HOST, port)) + + def test_set_reuse_addr(self): + sock = socket.socket() + try: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except socket.error: + unittest.skip("SO_REUSEADDR not supported") + else: + # if SO_REUSEADDR succeeded for sock we expect asyncore + # to do the same + s = asyncore.dispatcher(socket.socket()) + self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR)) + s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s.set_reuse_addr() + self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR)) + finally: + sock.close() + + +class BaseTestAPI_UseSelectMixin(BaseTestAPI): + use_poll = True + +class BaseTestAPI_UsePollMixin(BaseTestAPI): + use_poll = True + + def test_main(): tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, - DispatcherWithSendTests_UsePoll] + DispatcherWithSendTests_UsePoll, BaseTestAPI_UseSelectMixin] if hasattr(asyncore, 'file_wrapper'): tests.append(FileWrapperTest) + if hasattr(select, 'poll'): + tests.append(BaseTestAPI_UsePollMixin) run_unittest(*tests)