diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst --- a/Doc/library/ssl.rst +++ b/Doc/library/ssl.rst @@ -428,6 +428,15 @@ Constants .. versionadded:: 3.3 +.. data:: OP_NO_COMPRESSION + + Disable compression on the SSL channel. This is useful if the application + protocol supports its own compression scheme. + + This option is only available with OpenSSL 1.0.0 and later. + + .. versionadded:: 3.3 + .. data:: HAS_SNI Whether the OpenSSL library has built-in support for the *Server Name @@ -553,6 +562,16 @@ SSL sockets also have the following addi version of the SSL protocol that defines its use, and the number of secret bits being used. If no connection has been established, returns ``None``. +.. method:: SSLSocket.compression() + + Return the compression algorithm being used as a string, or ``None`` + if the connection isn't compressed. + + If the higher-level protocol supports its own compression mechanism, + you can use :data:`OP_NO_COMPRESSION` to disable SSL-level compression. + + .. versionadded:: 3.3 + .. method:: SSLSocket.get_channel_binding(cb_type="tls-unique") Get channel binding data for current connection, as a bytes object. Returns diff --git a/Lib/ssl.py b/Lib/ssl.py --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -70,6 +70,10 @@ from _ssl import ( OP_ALL, OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_TLSv1, OP_CIPHER_SERVER_PREFERENCE, ) +try: + from _ssl import OP_NO_COMPRESSION +except ImportError: + pass from _ssl import RAND_status, RAND_egd, RAND_add, RAND_bytes, RAND_pseudo_bytes from _ssl import ( SSL_ERROR_ZERO_RETURN, @@ -330,6 +334,13 @@ class SSLSocket(socket): else: return self._sslobj.cipher() + def compression(self): + self._checkClosed() + if not self._sslobj: + return None + else: + return self._sslobj.compression() + def send(self, data, flags=0): self._checkClosed() if self._sslobj: diff --git a/Lib/test/ssl_servers.py b/Lib/test/ssl_servers.py --- a/Lib/test/ssl_servers.py +++ b/Lib/test/ssl_servers.py @@ -97,6 +97,7 @@ class StatsRequestHandler(BaseHTTPReques stats = { 'session_cache': context.session_stats(), 'cipher': sock.cipher(), + 'compression': sock.compression(), } body = pprint.pformat(stats) body = body.encode('utf-8') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -99,6 +99,8 @@ class BasicSocketTests(unittest.TestCase ssl.CERT_OPTIONAL ssl.CERT_REQUIRED ssl.OP_CIPHER_SERVER_PREFERENCE + if ssl.OPENSSL_VERSION_INFO >= (1, 0): + ssl.OP_NO_COMPRESSION self.assertIn(ssl.HAS_SNI, {True, False}) def test_random(self): @@ -1175,7 +1177,12 @@ else: if connectionchatty: if support.verbose: sys.stdout.write(" client: closing connection.\n") + stats = { + 'compression': s.compression(), + 'cipher': s.cipher(), + } s.close() + return stats finally: server.stop() server.join() @@ -1804,6 +1811,25 @@ else: server.stop() server.join() + def test_compression(self): + context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + context.load_cert_chain(CERTFILE) + stats = server_params_test(context, context, + chatty=True, connectionchatty=True) + if support.verbose: + sys.stdout.write(" got compression: {!r}\n".format(stats['compression'])) + self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' }) + + @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'), + "ssl.OP_NO_COMPRESSION needed for this test") + def test_compression_disabled(self): + context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + context.load_cert_chain(CERTFILE) + stats = server_params_test(context, context, + chatty=True, connectionchatty=True) + self.assertIs(stats['compression'], None) + + def test_main(verbose=False): if support.verbose: plats = { diff --git a/Modules/_ssl.c b/Modules/_ssl.c --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -999,6 +999,25 @@ static PyObject *PySSL_cipher (PySSLSock return NULL; } +static PyObject *PySSL_compression(PySSLSocket *self) { +#ifdef OPENSSL_NO_COMP + Py_RETURN_NONE; +#else + const COMP_METHOD *comp_method; + const char *short_name; + + if (self->ssl == NULL) + Py_RETURN_NONE; + comp_method = SSL_get_current_compression(self->ssl); + if (comp_method == NULL || comp_method->type == NID_undef) + Py_RETURN_NONE; + short_name = OBJ_nid2sn(comp_method->type); + if (short_name == NULL) + Py_RETURN_NONE; + return PyUnicode_DecodeFSDefault(short_name); +#endif +} + static void PySSL_dealloc(PySSLSocket *self) { if (self->peer_cert) /* Possible not to have one? */ @@ -1452,6 +1471,7 @@ static PyMethodDef PySSLMethods[] = { {"peer_certificate", (PyCFunction)PySSL_peercert, METH_VARARGS, PySSL_peercert_doc}, {"cipher", (PyCFunction)PySSL_cipher, METH_NOARGS}, + {"compression", (PyCFunction)PySSL_compression, METH_NOARGS}, {"shutdown", (PyCFunction)PySSL_SSLshutdown, METH_NOARGS, PySSL_SSLshutdown_doc}, #if HAVE_OPENSSL_FINISHED @@ -2452,6 +2472,10 @@ PyInit__ssl(void) PyModule_AddIntConstant(m, "OP_NO_TLSv1", SSL_OP_NO_TLSv1); PyModule_AddIntConstant(m, "OP_CIPHER_SERVER_PREFERENCE", SSL_OP_CIPHER_SERVER_PREFERENCE); +#ifdef SSL_OP_NO_COMPRESSION + PyModule_AddIntConstant(m, "OP_NO_COMPRESSION", + SSL_OP_NO_COMPRESSION); +#endif #ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME r = Py_True;