diff --git a/Lib/socket.py b/Lib/socket.py --- a/Lib/socket.py +++ b/Lib/socket.py @@ -435,3 +435,75 @@ raise err else: raise error("getaddrinfo returns an empty list") + +def has_dual_stack(): + """Return True if kernel allows creating a socket which is able to + listen for both IPv4 and IPv6 connections. + """ + if not has_ipv6 \ + or not hasattr(_socket, 'AF_INET6') \ + or not hasattr(_socket, 'IPV6_V6ONLY'): + return False + try: + with socket(AF_INET6, SOCK_STREAM) as sock: + if not sock.getsockopt(IPPROTO_IPV6, IPV6_V6ONLY): + return True + else: + sock.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, False) + return True + except error: + return False + +def create_server_sock(address, + reuse_addr=os.name == 'posix' and sys.platform != 'cygwin', + queue_size=5, + dual_stack=has_dual_stack()): + """Convenience function which creates a TCP server bound to + *address* and return the socket object. + + Internally it takes care of choosing the right address family + (IPv4 or IPv6) depending on the host specified in *address*. + If *host* is an empty string or None all interfaces are assumed. + If dual stack is supported by kernel the socket will be able to + listen for both IPv4 and IPv6 connections. + + The returned socket can be used to accept() new connections as in: + + >>> server = create_server_sock((None, 8000)) + >>> while True: + ... sock, addr = server.accept() + ... # handle new sock connection + """ + AF_INET6_ = getattr(_socket, "AF_INET6", 0) + host, port = address + if host == "": + # http://mail.python.org/pipermail/python-ideas/2013-March/019937.html + host = None + # if dual stack is supported and no host was specified assume '::' + # (IPv6 / all interfaces) + if host is None and dual_stack: + host = "::" + err = None + info = getaddrinfo(host, port, AF_UNSPEC, SOCK_STREAM, 0, AI_PASSIVE) + for res in info: + af, socktype, proto, canonname, sa = res + sock = None + try: + sock = socket(af, socktype, proto) + if reuse_addr: + sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + if af == AF_INET6_ and dual_stack: + if sock.getsockopt(IPPROTO_IPV6, IPV6_V6ONLY): + sock.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, False) + sock.bind(sa) + sock.listen(queue_size) + return sock + except error as _: + err = _ + if sock is not None: + sock.close() + + if err is not None: + raise err + else: + raise error("getaddrinfo returns an empty list") diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -4250,6 +4250,63 @@ socket.create_connection((HOST, 1234)) +class CreateServerSockTest(unittest.TestCase): + + def echo_server(self, sock, port=None): + def run(): + with sock: + conn, _ = sock.accept() + with conn: + msg = conn.recv(1024) + if not msg: + return + conn.sendall(msg) + + sock.settimeout(2) + t = threading.Thread(target=run) + t.start() + time.sleep(.1) + + def test_base(self): + port = support.find_unused_port() + with socket.create_server_sock((None, port)) as sock: + self.assertEqual(sock.getsockname()[1], port) + self.assertEqual(sock.type, socket.SOCK_STREAM) + if socket.has_dual_stack(): + self.assertEqual(sock.family, socket.AF_INET6) + else: + self.assertEqual(sock.family, socket.AF_INET) + self.echo_server(sock) + with socket.create_connection(('localhost', port), timeout=2) as cl: + cl.sendall(b'foo') + self.assertEqual(cl.recv(1024), b'foo') + + def test_dual_stack(self): + with socket.create_server_sock((None, 0)) as sock: + self.echo_server(sock) + port = sock.getsockname()[1] + with socket.create_connection(("127.0.0.1", port), timeout=2) as cl: + cl.sendall(b'foo') + self.assertEqual(cl.recv(1024), b'foo') + + with socket.create_server_sock((None, 0)) as sock: + self.echo_server(sock) + port = sock.getsockname()[1] + if socket.has_dual_stack(): + with socket.create_connection(("::1", port), timeout=2) as cl: + cl.sendall(b'foo') + self.assertEqual(cl.recv(1024), b'foo') + else: + self.assertRaises(ConnectionRefusedError, + socket.create_connection, ("::1", port)) + # just stop server + with socket.create_connection(("127.0.0.1", port), timeout=2) \ + as cl: + cl.sendall(b'foo') + cl.recv(1024) + unittest.skip('dual stack cannot be tested as not supported') + + @unittest.skipUnless(thread, 'Threading required for this test.') class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):