diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst index 76003ea..ec392d0 100644 --- a/Doc/library/ssl.rst +++ b/Doc/library/ssl.rst @@ -1095,6 +1095,21 @@ SSL sockets also have the following additional methods and attributes: The returned dictionary includes additional X509v3 extension items such as ``crlDistributionPoints``, ``caIssuers`` and ``OCSP`` URIs. +.. method:: SSLSocket.getpeercertchain(binary_form=False, validate=True) + + Returns certificate chain for the peer. If no chain is provided, returns + None. Otherwise returns a tuple of dicts containing information about the + certificates. The chain starts with the leaf certificate and ends with the + root certificate. If called on the client side, the leaf certificate is the + peer's certificate. + + If the optional argument *binary_form* is True, return a list of *binary_form*-encoded copies + of the certificates. + If the optional argument *validate* is False, return the peer's cert chain + without any validation and without the root CA cert."); + + .. versionadded:: 3.7 + .. method:: SSLSocket.cipher() Returns a three-value tuple containing the name of the cipher being used, the diff --git a/Lib/ssl.py b/Lib/ssl.py index 8ad4a33..502beb9 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -644,6 +644,13 @@ class SSLObject: """ return self._sslobj.peer_certificate(binary_form) + def peercertchain(self, binary_form=False, validate=True): + """"Returns the certificate chain of the SSL connection + as tuple of dicts. + + Return None if no chain is provieded.""" + return self._sslobj.peer_cert_chain(binary_form, validate) + def selected_npn_protocol(self): """Return the currently selected NPN protocol as a string, or ``None`` if a next protocol was not negotiated or if NPN is not supported by one @@ -891,6 +898,13 @@ class SSLSocket(socket): self._check_connected() return self._sslobj.getpeercert(binary_form) + def getpeercertchain(self, binary_form=False, validate=True): + """Returns the certificate chain of the SSL connection + as tuple of dicts. Return None if no chain is provieded.""" + self._checkClosed() + self._check_connected() + return self._sslobj.peercertchain(binary_form, validate) + def selected_npn_protocol(self): self._checkClosed() if not self._sslobj or not _ssl.HAS_NPN: diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index ad30105..bb5daab 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -1675,6 +1675,37 @@ class SimpleBackgroundTests(unittest.TestCase): self.assertTrue(cert) self.assertEqual(len(ctx.get_ca_certs()), 1) + + def test_getpeercertchain(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + try: + peer_cert = s.getpeercert() + peer_cert_bin = s.getpeercert(True) + chain = s.getpeercertchain() + chain_bin = s.getpeercertchain(True) + chain_no_validate = s.getpeercertchain(validate=False) + chain_bin_no_validate = s.getpeercertchain(True, False) + finally: + self.assertTrue(peer_cert) + self.assertEqual(len(chain), 2) + self.assertTrue(peer_cert_bin) + self.assertEqual(len(chain_bin), 2) + + # ca cert + ca_certs = ctx.get_ca_certs() + self.assertEqual(len(ca_certs), 1) + test_get_ca_certsert = ca_certs[0] + ca_cert_bin = ctx.get_ca_certs(True)[0] + + self.assertEqual(chain, (peer_cert, test_get_ca_certsert)) + self.assertEqual(chain_bin, (peer_cert_bin, ca_cert_bin)) + self.assertEqual(chain_no_validate, (peer_cert,)) + self.assertEqual(chain_bin_no_validate, (peer_cert_bin,)) + @needs_sni def test_context_setget(self): # Check that the context of a connected socket can be replaced. diff --git a/Modules/_ssl.c b/Modules/_ssl.c index b198857..d9726fe 100644 --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -1682,6 +1682,112 @@ _ssl__SSLSocket_cipher_impl(PySSLSocket *self) return cipher_to_tuple(current); } + +static PyObject * +_setX509StoreContextError(X509_STORE_CTX *store_ctx, char *filename, int lineno) { + int errcode; + const char *errstr; + + errcode = X509_STORE_CTX_get_error(store_ctx); + errstr = X509_verify_cert_error_string(errcode); + fill_and_set_sslerror(PySSLErrorObject, errcode, errstr, lineno, errcode); + return NULL; +} + + +/*[clinic input] +_ssl._SSLSocket.peer_cert_chain + der as binary_mode: bool = False + validate: bool = True +[clinic start generated code]*/ + +static PyObject * +_ssl__SSLSocket_peer_cert_chain_impl(PySSLSocket *self, int binary_mode, + int validate) +/*[clinic end generated code: output=4a181dc80f2b94ce input=fcdaa1e176d678ba]*/ +{ + int len, i; + PyObject *retval = NULL, *ci=NULL; + STACK_OF(X509) *peer_chain; /* reference */ + STACK_OF(X509) *chain = NULL; /* either copy or reference */ + + assert((self->ctx != NULL) && (self->ctx->ctx != NULL)); + if (self->ssl == NULL || self->peer_cert == NULL) { + Py_RETURN_NONE; + } + + /* The peer just transmits the intermediate cert chain EXCLUDING the root + * CA certificate as this side is suppose to have a copy of the root + * certificate for verification. */ + peer_chain = SSL_get_peer_cert_chain(self->ssl); + + if (peer_chain == NULL) { + Py_RETURN_NONE; + } + + if (validate) { + X509_STORE_CTX *store_ctx; + + /* Initialize a store context with store (for root CA certs), the + * peer's cert and the peer's chain with intermediate CA certs. */ + + if ((store_ctx = X509_STORE_CTX_new()) == NULL) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + return NULL; + } + + if (!X509_STORE_CTX_init(store_ctx, + SSL_CTX_get_cert_store(self->ctx->ctx), + self->peer_cert, peer_chain)) { + _setX509StoreContextError(store_ctx, __FILE__, __LINE__); + X509_STORE_CTX_free(store_ctx); + goto end; + } + + /* Validate peer cert using its intermediate CA certs and the + * context's root CA certs. */ + if (X509_verify_cert(store_ctx) <= 0) { + _setX509StoreContextError(store_ctx, __FILE__, __LINE__); + X509_STORE_CTX_free(store_ctx); + goto end; + } + + /* Get chain from store context */ + chain = X509_STORE_CTX_get1_chain(store_ctx); + X509_STORE_CTX_free(store_ctx); + } else { + /* otherwise just use the peer_chain value */ + chain = peer_chain; + } + + len = sk_X509_num(chain); + + if ((retval = PyTuple_New(len)) == NULL) { + return NULL; + } + + for (i = 0; i < len; i++){ + X509 *cert = sk_X509_value(chain, i); + if (binary_mode) { + ci = _certificate_to_der(cert); + } else { + ci = _decode_certificate(cert); + } + + if (ci == NULL) { + Py_CLEAR(retval); + goto end; + } + PyTuple_SET_ITEM(retval, i, ci); + } + + end: + if (validate && (chain != NULL)) { + sk_X509_pop_free(chain, X509_free); + } + return retval; +} + /*[clinic input] _ssl._SSLSocket.version [clinic start generated code]*/ @@ -2513,6 +2619,7 @@ static PyMethodDef PySSLMethods[] = { _SSL__SSLSOCKET_PEER_CERTIFICATE_METHODDEF _SSL__SSLSOCKET_CIPHER_METHODDEF _SSL__SSLSOCKET_SHARED_CIPHERS_METHODDEF + _SSL__SSLSOCKET_PEER_CERT_CHAIN_METHODDEF _SSL__SSLSOCKET_VERSION_METHODDEF _SSL__SSLSOCKET_SELECTED_NPN_PROTOCOL_METHODDEF _SSL__SSLSOCKET_SELECTED_ALPN_PROTOCOL_METHODDEF diff --git a/Modules/clinic/_ssl.c.h b/Modules/clinic/_ssl.c.h index 29f5838..6b1de3a 100644 --- a/Modules/clinic/_ssl.c.h +++ b/Modules/clinic/_ssl.c.h @@ -115,6 +115,37 @@ _ssl__SSLSocket_cipher(PySSLSocket *self, PyObject *Py_UNUSED(ignored)) return _ssl__SSLSocket_cipher_impl(self); } +PyDoc_STRVAR(_ssl__SSLSocket_peer_cert_chain__doc__, +"peer_cert_chain($self, /, der=False, validate=True)\n" +"--\n" +"\n"); + +#define _SSL__SSLSOCKET_PEER_CERT_CHAIN_METHODDEF \ + {"peer_cert_chain", (PyCFunction)_ssl__SSLSocket_peer_cert_chain, METH_FASTCALL, _ssl__SSLSocket_peer_cert_chain__doc__}, + +static PyObject * +_ssl__SSLSocket_peer_cert_chain_impl(PySSLSocket *self, int binary_mode, + int validate); + +static PyObject * +_ssl__SSLSocket_peer_cert_chain(PySSLSocket *self, PyObject **args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"der", "validate", NULL}; + static _PyArg_Parser _parser = {"|pp:peer_cert_chain", _keywords, 0}; + int binary_mode = 0; + int validate = 1; + + if (!_PyArg_ParseStack(args, nargs, kwnames, &_parser, + &binary_mode, &validate)) { + goto exit; + } + return_value = _ssl__SSLSocket_peer_cert_chain_impl(self, binary_mode, validate); + +exit: + return return_value; +} + PyDoc_STRVAR(_ssl__SSLSocket_version__doc__, "version($self, /)\n" "--\n" @@ -1168,4 +1199,4 @@ exit: #ifndef _SSL_ENUM_CRLS_METHODDEF #define _SSL_ENUM_CRLS_METHODDEF #endif /* !defined(_SSL_ENUM_CRLS_METHODDEF) */ -/*[clinic end generated code: output=a859b21fe68a6115 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=83b14d8724aca679 input=a9049054013a1b77]*/