diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst --- a/Doc/library/ssl.rst +++ b/Doc/library/ssl.rst @@ -791,6 +791,13 @@ :class:`SSLContext` objects have the following methods and attributes: +.. method:: SSLContext.cert_store_stats() + + Get statistics about loaded X.509 CA certificates, other X.509 certificates + and certificate revocation lists. + + .. versionadded:: 3.3 + .. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None) Load a private key and the corresponding certificate. The *certfile* @@ -837,6 +844,15 @@ following an `OpenSSL specific layout `_. +.. method:: SSLContext.get_ca_list(binary_form=False) + + Get a list of loaded "certification authority" (CA) certificates. If the + ``binary_form`` parameter is :const:`False` each list + entry is a dict like the output of :meth:`SSLSocket.getpeercert`. Otherwise + the method returns a list of DER-encoded certificates. The returned list + does not contain certificates from *capath* unless a certificate was + requested and loaded by a SSL connection. + .. method:: SSLContext.set_default_verify_paths() Load a set of default "certification authority" (CA) certificates from diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -666,6 +666,47 @@ gc.collect() self.assertIs(wr(), None) + def test_cert_store_stats(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.cert_store_stats(), + {'x509_ca': 0, 'crl': 0, 'x509': 0}) + ctx.load_cert_chain(CERTFILE) + self.assertEqual(ctx.cert_store_stats(), + {'x509_ca': 0, 'crl': 0, 'x509': 0}) + ctx.load_verify_locations(CERTFILE) + self.assertEqual(ctx.cert_store_stats(), + {'x509_ca': 0, 'crl': 0, 'x509': 1}) + ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT) + self.assertEqual(ctx.cert_store_stats(), + {'x509_ca': 1, 'crl': 0, 'x509': 1}) + + def test_get_ca_list(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.get_ca_list(), []) + # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE + ctx.load_verify_locations(CERTFILE) + self.assertEqual(ctx.get_ca_list(), []) + # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert + ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT) + self.assertEqual(ctx.get_ca_list(), + [{'issuer': ((('organizationName', 'Root CA'),), + (('organizationalUnitName', 'http://www.cacert.org'),), + (('commonName', 'CA Cert Signing Authority'),), + (('emailAddress', 'support@cacert.org'),)), + 'notAfter': 'Mar 29 12:29:49 2033 GMT', + 'notBefore': 'Mar 30 12:29:49 2003 GMT', + 'serialNumber': '00', + 'subject': ((('organizationName', 'Root CA'),), + (('organizationalUnitName', 'http://www.cacert.org'),), + (('commonName', 'CA Cert Signing Authority'),), + (('emailAddress', 'support@cacert.org'),)), + 'version': 3}]) + + with open(SVN_PYTHON_ORG_ROOT_CERT) as f: + pem = f.read() + der = ssl.PEM_cert_to_DER_cert(pem) + self.assertEqual(ctx.get_ca_list(True), [der]) + class SSLErrorTests(unittest.TestCase): @@ -981,6 +1022,22 @@ finally: s.close() + def test_get_ca_list_capath(self): + # capath certs are loaded on request + with support.transient_internet("svn.python.org"): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + self.assertEqual(ctx.get_ca_list(), []) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + self.assertEqual(len(ctx.get_ca_list()), 1) + try: import threading diff --git a/Modules/_ssl.c b/Modules/_ssl.c --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -1023,6 +1023,24 @@ return NULL; } +static PyObject * +_certificate_to_der(X509 *certificate) +{ + unsigned char *bytes_buf = NULL; + int len; + PyObject *retval; + + bytes_buf = NULL; + len = i2d_X509(certificate, &bytes_buf); + if (len < 0) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + return NULL; + } + /* this is actually an immutable bytes sequence */ + retval = PyBytes_FromStringAndSize((const char *) bytes_buf, len); + OPENSSL_free(bytes_buf); + return retval; +} static PyObject * PySSL_test_decode_certificate (PyObject *mod, PyObject *args) { @@ -1068,8 +1086,6 @@ static PyObject * PySSL_peercert(PySSLSocket *self, PyObject *args) { - PyObject *retval = NULL; - int len; int verification; int binary_mode = 0; @@ -1081,21 +1097,7 @@ if (binary_mode) { /* return cert in DER-encoded format */ - - unsigned char *bytes_buf = NULL; - - bytes_buf = NULL; - len = i2d_X509(self->peer_cert, &bytes_buf); - if (len < 0) { - PySSL_SetError(self, len, __FILE__, __LINE__); - return NULL; - } - /* this is actually an immutable bytes sequence */ - retval = PyBytes_FromStringAndSize - ((const char *) bytes_buf, len); - OPENSSL_free(bytes_buf); - return retval; - + return _certificate_to_der(self->peer_cert); } else { verification = SSL_CTX_get_verify_mode(SSL_get_SSL_CTX(self->ssl)); if ((verification & SSL_VERIFY_PEER) == 0) @@ -2555,6 +2557,109 @@ #endif } +PyDoc_STRVAR(PySSL_get_stats_doc, +"cert_store_stats() -> {'crl': int, 'x509_ca': int, 'x509': int}\n\ +\n\ +Returns quantities of loaded X.509 certificates with a CA extension, other\n\ +X.509 certificates and certificate revocation lists inside the context's\n\ +cert store.\n\ +NOTE: Certificates in a capath directory aren't loaded unless they have\n\ +been used at least once."); + +static PyObject * +cert_store_stats(PySSLContext *self) +{ + X509_STORE *store; + X509_OBJECT *obj; + int x509 = 0, crl = 0, pkey = 0, ca = 0, i; + + store = SSL_CTX_get_cert_store(self->ctx); + for (i = 0; i < sk_X509_OBJECT_num(store->objs); i++) { + obj = sk_X509_OBJECT_value(store->objs, i); + switch (obj->type) { + case X509_LU_X509: + if (X509_check_ca(obj->data.x509)) { + ca++; + } else { + x509++; + } + break; + case X509_LU_CRL: + crl++; + break; + case X509_LU_PKEY: + pkey++; + break; + default: + /* FAIL, RETRY, PKEY */ + break; + } + } + return Py_BuildValue("{sisisi}", "x509", x509, "crl", crl, + "x509_ca", ca); +} + +PyDoc_STRVAR(PySSL_get_ca_list_doc, +"get_ca_list([der=False]) -> list of loaded certificate\n\ +\n\ +Returns a list of dicts with information of loaded CA certs. If the\n\ +optional argument is True, returns a DER-encoded copy of the CA certificate.\n\ +NOTE: Certificates in a capath directory aren't loaded unless they have\n\ +been used at least once."); + +static PyObject * +get_ca_list(PySSLContext *self, PyObject *args) +{ + X509_STORE *store; + PyObject *ci = NULL, *rlist = NULL; + int i; + int binary_mode = 0; + + if (!PyArg_ParseTuple(args, "|p:get_ca_list", &binary_mode)) { + return NULL; + } + + if ((rlist = PyList_New(0)) == NULL) { + return NULL; + } + + store = SSL_CTX_get_cert_store(self->ctx); + for (i = 0; i < sk_X509_OBJECT_num(store->objs); i++) { + X509_OBJECT *obj; + X509 *cert; + + obj = sk_X509_OBJECT_value(store->objs, i); + if (obj->type != X509_LU_X509) { + /* not a x509 cert */ + continue; + } + /* CA for any purpose */ + cert = obj->data.x509; + if (!X509_check_ca(cert)) { + continue; + } + if (binary_mode) { + ci = _certificate_to_der(cert); + } else { + ci = _decode_certificate(cert); + } + if (ci == NULL) { + goto error; + } + if (PyList_Append(rlist, ci) == -1) { + goto error; + } + Py_CLEAR(ci); + } + return rlist; + + error: + Py_XDECREF(ci); + Py_XDECREF(rlist); + return NULL; +} + + static PyGetSetDef context_getsetlist[] = { {"options", (getter) get_options, (setter) set_options, NULL}, @@ -2586,6 +2691,10 @@ #endif {"set_servername_callback", (PyCFunction) set_servername_callback, METH_VARARGS, PySSL_set_servername_callback_doc}, + {"cert_store_stats", (PyCFunction) cert_store_stats, + METH_NOARGS, PySSL_get_stats_doc}, + {"get_ca_list", (PyCFunction) get_ca_list, + METH_VARARGS, PySSL_get_ca_list_doc}, {NULL, NULL} /* sentinel */ };