diff -r 4d00d0109147 Doc/library/ssl.rst --- a/Doc/library/ssl.rst Wed Jan 07 00:37:01 2015 +1000 +++ b/Doc/library/ssl.rst Wed Jan 07 11:00:55 2015 -0600 @@ -920,16 +920,27 @@ SSL sockets also have the following addi such as ``crlDistributionPoints``, ``caIssuers`` and ``OCSP`` URIs. .. method:: SSLSocket.cipher() Returns a three-value tuple containing the name of the cipher being used, the version of the SSL protocol that defines its use, and the number of secret bits being used. If no connection has been established, returns ``None``. +.. method:: SSLSocket.shared_ciphers() + + Return the list of ciphers shared by the client during the handshake. Each + entry of the returned list is a three-value tuple containing the name of the + cipher, the version of the SSL protocol that defines its use, and the number + of secret bits the cipher uses. :meth:`~SSLSocket.shared_ciphers` returns + ``None`` if no connection has been established or the socket is a client + socket. + + .. versionadded:: 3.5 + .. method:: SSLSocket.compression() Return the compression algorithm being used as a string, or ``None`` if the connection isn't compressed. If the higher-level protocol supports its own compression mechanism, you can use :data:`OP_NO_COMPRESSION` to disable SSL-level compression. @@ -1779,16 +1790,17 @@ provided. - :attr:`~SSLSocket.context` - :attr:`~SSLSocket.server_side` - :attr:`~SSLSocket.server_hostname` - :meth:`~SSLSocket.read` - :meth:`~SSLSocket.write` - :meth:`~SSLSocket.getpeercert` - :meth:`~SSLSocket.selected_npn_protocol` - :meth:`~SSLSocket.cipher` + - :meth:`~SSLSocket.shared_ciphers` - :meth:`~SSLSocket.compression` - :meth:`~SSLSocket.pending` - :meth:`~SSLSocket.do_handshake` - :meth:`~SSLSocket.unwrap` - :meth:`~SSLSocket.get_channel_binding` When compared to :class:`SSLSocket`, this object lacks the following features: diff -r 4d00d0109147 Lib/ssl.py --- a/Lib/ssl.py Wed Jan 07 00:37:01 2015 +1000 +++ b/Lib/ssl.py Wed Jan 07 11:00:55 2015 -0600 @@ -567,16 +567,20 @@ class SSLObject: if _ssl.HAS_NPN: return self._sslobj.selected_npn_protocol() def cipher(self): """Return the currently selected cipher as a 3-tuple ``(name, ssl_version, secret_bits)``.""" return self._sslobj.cipher() + def shared_ciphers(self): + """Return the ciphers shared by the client during the handshake.""" + return self._sslobj.shared_ciphers() + def compression(self): """Return the current compression algorithm in use, or ``None`` if compression was not negotiated or not supported by one of the peers.""" return self._sslobj.compression() def pending(self): """Return the number of bytes that can be read immediately.""" return self._sslobj.pending() @@ -779,16 +783,22 @@ class SSLSocket(socket): def cipher(self): self._checkClosed() if not self._sslobj: return None else: return self._sslobj.cipher() + def shared_ciphers(self): + self._checkClosed() + if not self._sslobj: + return None + return self._sslobj.shared_ciphers() + def compression(self): self._checkClosed() if not self._sslobj: return None else: return self._sslobj.compression() def send(self, data, flags=0): diff -r 4d00d0109147 Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py Wed Jan 07 00:37:01 2015 +1000 +++ b/Lib/test/test_ssl.py Wed Jan 07 11:00:55 2015 -0600 @@ -1693,21 +1693,23 @@ class NetworkedBIOTests(unittest.TestCas outgoing = ssl.MemoryBIO() ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ctx.verify_mode = ssl.CERT_REQUIRED ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT) ctx.check_hostname = True sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org') self.assertIs(sslobj._sslobj.owner, sslobj) self.assertIsNone(sslobj.cipher()) + self.assertIsNone(sslobj.shared_ciphers()) self.assertRaises(ValueError, sslobj.getpeercert) if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: self.assertIsNone(sslobj.get_channel_binding('tls-unique')) self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) self.assertTrue(sslobj.cipher()) + self.assertIsNone(sslobj.shared_ciphers()) self.assertTrue(sslobj.getpeercert()) if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: self.assertTrue(sslobj.get_channel_binding('tls-unique')) self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) self.assertRaises(ssl.SSLError, sslobj.write, b'foo') sock.close() def test_read_write_data(self): @@ -1771,16 +1773,17 @@ else: self.server.conn_errors.append(e) if self.server.chatty: handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n") self.running = False self.server.stop() self.close() return False else: + self.server.shared_ciphers.append(self.sslconn.shared_ciphers()) if self.server.context.verify_mode == ssl.CERT_REQUIRED: cert = self.sslconn.getpeercert() if support.verbose and self.server.chatty: sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") cert_binary = self.sslconn.getpeercert(True) if support.verbose and self.server.chatty: sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") cipher = self.sslconn.cipher() @@ -1886,16 +1889,17 @@ else: self.chatty = chatty self.connectionchatty = connectionchatty self.starttls_server = starttls_server self.sock = socket.socket() self.port = support.bind_port(self.sock) self.flag = None self.active = False self.selected_protocols = [] + self.shared_ciphers = [] self.conn_errors = [] threading.Thread.__init__(self) self.daemon = True def __enter__(self): self.start(threading.Event()) self.flag.wait() return self @@ -2116,16 +2120,17 @@ else: 'compression': s.compression(), 'cipher': s.cipher(), 'peercert': s.getpeercert(), 'client_npn_protocol': s.selected_npn_protocol(), 'version': s.version(), }) s.close() stats['server_npn_protocols'] = server.selected_protocols + stats['server_shared_ciphers'] = server.shared_ciphers return stats def try_protocol_combo(server_protocol, client_protocol, expect_success, certsreqs=None, server_options=0, client_options=0): """ Try to SSL-connect using *client_protocol* to *server_protocol*. If *expect_success* is true, assert that the connection succeeds, if it's false, assert that the connection fails. @@ -3152,16 +3157,28 @@ else: with self.assertRaises(ssl.SSLError) as cm, \ support.captured_stderr() as stderr: stats = server_params_test(client_context, server_context, chatty=False, sni_name='supermessage') self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR') self.assertIn("TypeError", stderr.getvalue()) + def test_shared_ciphers(self): + server_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + client_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + client_context.set_ciphers("3DES") + server_context.set_ciphers("3DES:AES") + stats = server_params_test(client_context, server_context) + ciphers = stats['server_shared_ciphers'][0] + self.assertGreater(len(ciphers), 0) + for name, tls_version, bits in ciphers: + self.assertIn("DES-CBC3-", name) + self.assertEqual(bits, 112) + def test_read_write_after_close_raises_valuerror(self): context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(CERTFILE) context.load_cert_chain(CERTFILE) server = ThreadedEchoServer(context=context, chatty=False) with server: diff -r 4d00d0109147 Modules/_ssl.c --- a/Modules/_ssl.c Wed Jan 07 00:37:01 2015 +1000 +++ b/Modules/_ssl.c Wed Jan 07 11:00:55 2015 -0600 @@ -1355,64 +1355,94 @@ Returns the certificate for the peer. I returns None. If a certificate was provided, but not validated, returns\n\ an empty dictionary. Otherwise returns a dict containing information\n\ about the peer certificate.\n\ \n\ If the optional argument is True, returns a DER-encoded copy of the\n\ peer certificate, or None if no certificate was provided. This will\n\ return the certificate even if it wasn't validated."); -static PyObject *PySSL_cipher (PySSLSocket *self) { - - PyObject *retval, *v; - const SSL_CIPHER *current; - char *cipher_name; - char *cipher_protocol; - - if (self->ssl == NULL) - Py_RETURN_NONE; - current = SSL_get_current_cipher(self->ssl); - if (current == NULL) - Py_RETURN_NONE; - - retval = PyTuple_New(3); +static PyObject * +cipher_to_tuple(const SSL_CIPHER *cipher) +{ + const char *cipher_name, *cipher_protocol; + PyObject *v, *retval = PyTuple_New(3); if (retval == NULL) return NULL; - cipher_name = (char *) SSL_CIPHER_get_name(current); + cipher_name = SSL_CIPHER_get_name(cipher); if (cipher_name == NULL) { Py_INCREF(Py_None); PyTuple_SET_ITEM(retval, 0, Py_None); } else { v = PyUnicode_FromString(cipher_name); if (v == NULL) - goto fail0; + goto fail; PyTuple_SET_ITEM(retval, 0, v); } - cipher_protocol = (char *) SSL_CIPHER_get_version(current); + + cipher_protocol = SSL_CIPHER_get_version(cipher); if (cipher_protocol == NULL) { Py_INCREF(Py_None); PyTuple_SET_ITEM(retval, 1, Py_None); } else { v = PyUnicode_FromString(cipher_protocol); if (v == NULL) - goto fail0; + goto fail; PyTuple_SET_ITEM(retval, 1, v); } - v = PyLong_FromLong(SSL_CIPHER_get_bits(current, NULL)); + + v = PyLong_FromLong(SSL_CIPHER_get_bits(cipher, NULL)); if (v == NULL) - goto fail0; + goto fail; PyTuple_SET_ITEM(retval, 2, v); + return retval; - fail0: + fail: Py_DECREF(retval); return NULL; } + +static PyObject *PySSL_shared_ciphers(PySSLSocket *self) +{ + STACK_OF(SSL_CIPHER) *ciphers; + int i; + PyObject *res; + + if (!self->ssl->session || !self->ssl->session->ciphers) + Py_RETURN_NONE; + ciphers = self->ssl->session->ciphers; + res = PyList_New(sk_SSL_CIPHER_num(ciphers)); + if (!res) + return NULL; + for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) { + PyObject *tup = cipher_to_tuple(sk_SSL_CIPHER_value(ciphers, i)); + if (!tup) { + Py_DECREF(res); + return NULL; + } + PyList_SET_ITEM(res, i, tup); + } + return res; +} + +static PyObject *PySSL_cipher (PySSLSocket *self) +{ + const SSL_CIPHER *current; + + if (self->ssl == NULL) + Py_RETURN_NONE; + current = SSL_get_current_cipher(self->ssl); + if (current == NULL) + Py_RETURN_NONE; + return cipher_to_tuple(current); +} + static PyObject *PySSL_version(PySSLSocket *self) { const char *version; if (self->ssl == NULL) Py_RETURN_NONE; version = SSL_get_version(self->ssl); if (!strcmp(version, "unknown")) @@ -2014,16 +2044,17 @@ static PyMethodDef PySSLMethods[] = { PySSL_SSLwrite_doc}, {"read", (PyCFunction)PySSL_SSLread, METH_VARARGS, PySSL_SSLread_doc}, {"pending", (PyCFunction)PySSL_SSLpending, METH_NOARGS, PySSL_SSLpending_doc}, {"peer_certificate", (PyCFunction)PySSL_peercert, METH_VARARGS, PySSL_peercert_doc}, {"cipher", (PyCFunction)PySSL_cipher, METH_NOARGS}, + {"shared_ciphers", (PyCFunction)PySSL_shared_ciphers, METH_NOARGS}, {"version", (PyCFunction)PySSL_version, METH_NOARGS}, #ifdef OPENSSL_NPN_NEGOTIATED {"selected_npn_protocol", (PyCFunction)PySSL_selected_npn_protocol, METH_NOARGS}, #endif {"compression", (PyCFunction)PySSL_compression, METH_NOARGS}, {"shutdown", (PyCFunction)PySSL_SSLshutdown, METH_NOARGS, PySSL_SSLshutdown_doc}, {"tls_unique_cb", (PyCFunction)PySSL_tls_unique_cb, METH_NOARGS,