diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst --- a/Doc/library/ssl.rst +++ b/Doc/library/ssl.rst @@ -716,6 +716,15 @@ The returned dictionary includes additional items such as ``issuer`` and ``notBefore``. +.. method:: SSLSocket.getpeercertchain(binary_form=False, validate=True) + + If there is no certificate for the peer on the other end of the connection, + returns ``None``. + + XXX more documentation + + .. versionadded:: 3.4 + .. method:: SSLSocket.cipher() Returns a three-value tuple containing the name of the cipher being used, the diff --git a/Lib/ssl.py b/Lib/ssl.py --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -434,6 +434,13 @@ self._check_connected() return self._sslobj.peer_certificate(binary_form) + def getpeercertchain(self, binary_form=False, validate=True): + """Returns the certificate chain of the SSL connection + as tuple of dicts. Return None if no chain is provieded.""" + self._checkClosed() + self._check_connected() + return self._sslobj.peer_cert_chain(binary_form, validate) + def selected_npn_protocol(self): self._checkClosed() if not self._sslobj or not _ssl.HAS_NPN: diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -1052,6 +1052,70 @@ s.close() self.assertEqual(len(ctx.get_ca_certs()), 1) + def test_getpeercertchain(self): + with support.transient_internet("svn.python.org"): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + peer_cert = s.getpeercert() + peer_cert_bin = s.getpeercert(True) + chain = s.getpeercertchain() + chain_bin = s.getpeercertchain(True) + finally: + s.close() + + self.assertTrue(peer_cert) + self.assertEqual(len(chain), 2) + self.assertTrue(peer_cert_bin) + self.assertEqual(len(chain_bin), 2) + + # ca cert + ca_certs = ctx.get_ca_certs() + self.assertEqual(len(ca_certs), 1) + ca_cert = ca_certs[0] + ca_cert_bin = ctx.get_ca_certs(True)[0] + + self.assertEqual(chain, (peer_cert, ca_cert)) + self.assertEqual(chain_bin, (peer_cert_bin, ca_cert_bin)) + self.assertEqual(chain, ( + {'issuer': ((('organizationName', 'Root CA'),), + (('organizationalUnitName', 'http://www.cacert.org'),), + (('commonName', 'CA Cert Signing Authority'),), + (('emailAddress', 'support@cacert.org'),)), + 'notAfter': 'Dec 24 21:32:15 2013 GMT', + 'notBefore': 'Dec 25 21:32:15 2011 GMT', + 'serialNumber': '0B23A4', + 'subject': ((('commonName', 'svn.python.org'),),), + 'version': 3}, + {'issuer': ((('organizationName', 'Root CA'),), + (('organizationalUnitName', 'http://www.cacert.org'),), + (('commonName', 'CA Cert Signing Authority'),), + (('emailAddress', 'support@cacert.org'),)), + 'notAfter': 'Mar 29 12:29:49 2033 GMT', + 'notBefore': 'Mar 30 12:29:49 2003 GMT', + 'serialNumber': '00', + 'subject': ((('organizationName', 'Root CA'),), + (('organizationalUnitName', 'http://www.cacert.org'),), + (('commonName', 'CA Cert Signing Authority'),), + (('emailAddress', 'support@cacert.org'),)), + 'version': 3})) + + # now without access to the root CA cert + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + self.assertRaisesRegex(ssl.SSLError, + "unable to get local issuer certificate", + s.getpeercertchain) + chain = s.getpeercertchain(validate=False) + finally: + s.close() + self.assertEqual(chain, (peer_cert,)) + try: import threading diff --git a/Modules/_ssl.c b/Modules/_ssl.c --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -459,6 +459,17 @@ return NULL; } +static PyObject * +_setX509StoreContextError(X509_STORE_CTX *store_ctx, char *filename, int lineno) { + int errcode; + const char *errstr; + + errcode = X509_STORE_CTX_get_error(store_ctx); + errstr = X509_verify_cert_error_string(errcode); + fill_and_set_sslerror(PySSLErrorObject, errcode, errstr, lineno, errcode); + return NULL; +} + /* * SSL objects */ @@ -1119,6 +1130,109 @@ peer certificate, or None if no certificate was provided. This will\n\ return the certificate even if it wasn't validated."); + +PyDoc_STRVAR(PySSL_peercertchain_doc, +"peer_cert_chain(der=False, validate=True) -> tuple of certificates\n\ +\n\ +Returns certificate chain for the peer. If no chain is provided, returns\n\ +None. Otherwise returns a tuple of dicts containing information about the\n\ +certificates. The chain starts with the leaf certificate and ends with the\n\ +root certificate. If called on the client side, the leaf certificate is the\n\ +peer's certificate.\n\ +\n\ +If the optional argument *der* is True, return a list of DER-encoded copies\n\ +of the certificates.\n\ +If the optional argument *validate* is False, return the peer's cert chain\n\ +without any validation and without the root CA cert."); + +static PyObject * +PySSL_peercertchain(PySSLSocket *self, PyObject *args, PyObject *kwds) +{ + char *kwlist[] = {"der", "validate", NULL}; + + int len, i; + int binary_mode = 0, validate = 1; + PyObject *retval = NULL, *ci=NULL; + STACK_OF(X509) *peer_chain; /* reference */ + STACK_OF(X509) *chain = NULL; /* either copy or reference */ + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|pp:peer_cert_chain", kwlist, + &binary_mode, &validate)) { + return NULL; + } + + assert((self->ctx != NULL) && (self->ctx->ctx != NULL)); + if (self->ssl == NULL || self->peer_cert == NULL) { + Py_RETURN_NONE; + } + + /* The peer just transmits the intermediate cert chain EXCLUDING the root + * CA certificate as this side is suppose to have a copy of the root + * certificate for verification. */ + peer_chain = SSL_get_peer_cert_chain(self->ssl); + if (peer_chain == NULL) { + Py_RETURN_NONE; + } + + if (validate) { + X509_STORE_CTX *store_ctx; + + /* Initialize a store context with store (for root CA certs), the + * peer's cert and the peer's chain with intermediate CA certs. */ + if ((store_ctx = X509_STORE_CTX_new()) == NULL) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + return NULL; + } + if (!X509_STORE_CTX_init(store_ctx, + SSL_CTX_get_cert_store(self->ctx->ctx), + self->peer_cert, peer_chain)) { + _setX509StoreContextError(store_ctx, __FILE__, __LINE__); + X509_STORE_CTX_free(store_ctx); + goto end; + } + + /* Validate peer cert using its intermediate CA certs and the + * context's root CA certs. */ + if (X509_verify_cert(store_ctx) <= 0) { + _setX509StoreContextError(store_ctx, __FILE__, __LINE__); + X509_STORE_CTX_free(store_ctx); + goto end; + } + + /* Get chain from store context */ + chain = X509_STORE_CTX_get1_chain(store_ctx); + X509_STORE_CTX_free(store_ctx); + } else { + /* otherwise just use the peer_chain value */ + chain = peer_chain; + } + + len = sk_X509_num(chain); + if ((retval = PyTuple_New(len)) == NULL) { + return NULL; + } + for (i = 0; i < len; i++) { + X509 *cert = sk_X509_value(chain, i); + if (binary_mode) { + ci = _certificate_to_der(cert); + } else { + ci = _decode_certificate(cert); + } + if (ci == NULL) { + Py_CLEAR(retval); + goto end; + } + PyTuple_SET_ITEM(retval, i, ci); + } + + end: + if (validate && (chain != NULL)) { + sk_X509_pop_free(chain, X509_free); + } + return retval; +} + + static PyObject *PySSL_cipher (PySSLSocket *self) { PyObject *retval, *v; @@ -1696,6 +1810,8 @@ PySSL_SSLpending_doc}, {"peer_certificate", (PyCFunction)PySSL_peercert, METH_VARARGS, PySSL_peercert_doc}, + {"peer_cert_chain", (PyCFunction)PySSL_peercertchain, + METH_VARARGS | METH_KEYWORDS, PySSL_peercertchain_doc}, {"cipher", (PyCFunction)PySSL_cipher, METH_NOARGS}, #ifdef OPENSSL_NPN_NEGOTIATED {"selected_npn_protocol", (PyCFunction)PySSL_selected_npn_protocol, METH_NOARGS},