# HG changeset patch # User Steve Dower # Date 1479591390 28800 # Sat Nov 19 13:36:30 2016 -0800 # Branch 3.6 # Node ID 90545af7b5a60d06d95a15994c36cdc30bc0cc58 # Parent 2e1fb851dfb45e6d4c27d596302946e879694e1d Issue #28747: Expose SSL_CTX_set_cert_verify_callback diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst --- a/Doc/library/ssl.rst +++ b/Doc/library/ssl.rst @@ -1597,6 +1597,21 @@ >>> stats['hits'], stats['misses'] (0, 0) +.. method:: SSLContext.set_cert_verify_callback(callback=None) + + Sets the function that determines whether a certificate is trusted. + + The function takes a single parameter, which is the ASN1-encoded copy + of the certificate. If the certificate is not trusted, the callback + should raise any error. When failure to validate the certificate + causes the overall validation process to fail, this error will be + chained with a *SSLError*. Otherwise, it will be silenced. + + Passing :const:`None` for *callback* will revert to the default + behavior. + + .. versionadded:: 3.6 + .. attribute:: SSLContext.check_hostname Whether to match the peer cert's hostname with :func:`match_hostname` in diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -1770,6 +1770,68 @@ self.assertEqual(buf, b'foo\n') self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) + def test_set_cert_verify_callback(self): + call_count = 0 + def callback(cert): + nonlocal call_count + call_count += 1 + + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx._set_cert_verify_callback(callback) + + s = socket.socket(socket.AF_INET) + with ctx.wrap_socket(s) as ss: + ss.connect(self.server_addr) + + # Expect one callback + self.assertEqual(1, call_count) + + ctx._set_cert_verify_callback(None) + s = socket.socket(socket.AF_INET) + with ctx.wrap_socket(s) as ss: + ss.connect(self.server_addr) + + # Expect no callbacks, so this should still be one + self.assertEqual(1, call_count) + + def test_set_cert_verify_callback_error(self): + def raise_error(cert): + raise ValueError + + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx._set_cert_verify_callback(raise_error) + + s = socket.socket(socket.AF_INET) + with ctx.wrap_socket(s) as ss: + try: + ss.connect(self.server_addr) + self.fail('Expected ssl.SSLError') + except ssl.SSLError as ex: + self.assertIsInstance(ex.__context__, ValueError) + + def test_set_cert_verify_callback_suppress_error(self): + call_count = 0 + def raise_error(cert): + nonlocal call_count + call_count += 1 + raise ValueError + + ctx = ssl.create_default_context() + ctx.check_hostname = False + + # Setting CERT_NONE should suppress the error. + ctx.verify_mode = ssl.CERT_NONE + ctx._set_cert_verify_callback(raise_error) + + s = socket.socket(socket.AF_INET) + with ctx.wrap_socket(s) as ss: + ss.connect(self.server_addr) + + # No exception, but should still have been called + self.assertEqual(1, call_count) class NetworkedTests(unittest.TestCase): diff --git a/Misc/NEWS b/Misc/NEWS --- a/Misc/NEWS +++ b/Misc/NEWS @@ -40,6 +40,8 @@ Library ------- +- Issue #28747: Expose SSL_CTX_set_cert_verify_callback + - Issue #19717: Makes Path.resolve() succeed on paths that do not exist. Patch by Vajrasky Kok diff --git a/Modules/_ssl.c b/Modules/_ssl.c --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -291,6 +291,7 @@ PyObject *set_hostname; #endif int check_hostname; + PyObject *verify_cert; } PySSLContext; typedef struct { @@ -419,9 +420,13 @@ { PyObject *err_value = NULL, *reason_obj = NULL, *lib_obj = NULL; PyObject *init_value, *msg, *key; + PyObject *ch_ext, *ch_exv, *ch_tb; _Py_IDENTIFIER(reason); _Py_IDENTIFIER(library); + /* Preserve context on entry so we can chain */ + PyErr_Fetch(&ch_ext, &ch_exv, &ch_tb); + if (errcode != 0) { int lib, reason; @@ -478,7 +483,10 @@ lib_obj = Py_None; if (_PyObject_SetAttrId(err_value, &PyId_library, lib_obj)) goto fail; + PyErr_SetObject(type, err_value); + if (ch_ext) + _PyErr_ChainExceptions(ch_ext, ch_exv, ch_tb); fail: Py_XDECREF(err_value); } @@ -748,6 +756,11 @@ if (ret < 1) return PySSL_SetError(self, ret, __FILE__, __LINE__); + /* validation succeeded, so clear any exception that may have + been set by the callback. This can occur if verify_mode is + ssl.CERT_NONE. */ + PyErr_Clear(); + if (self->peer_cert) X509_free (self->peer_cert); PySSL_BEGIN_ALLOW_THREADS @@ -2661,6 +2674,7 @@ #ifndef OPENSSL_NO_TLSEXT self->set_hostname = NULL; #endif + self->verify_cert = NULL; /* Don't check host name by default */ if (proto_version == PY_SSL_VERSION_TLS_CLIENT) { self->check_hostname = 1; @@ -2768,6 +2782,7 @@ #ifndef OPENSSL_NO_TLSEXT Py_VISIT(self->set_hostname); #endif + Py_VISIT(self->verify_cert); return 0; } @@ -2777,6 +2792,7 @@ #ifndef OPENSSL_NO_TLSEXT Py_CLEAR(self->set_hostname); #endif + Py_CLEAR(self->verify_cert); return 0; } @@ -3927,6 +3943,85 @@ "x509_ca", ca); } +static int +_ssl_verify_cert(X509_STORE_CTX* ctx, void *arg) +{ + PyGILState_STATE gstate = PyGILState_Ensure(); + PySSLContext *ssl = (PySSLContext*)arg; + PyObject *enc_cert = NULL, *cb_result; + unsigned char *p; + int len; + + int r = 0; + + if (!ssl || !ssl->verify_cert) { + PyErr_SetString(PyExc_SystemError, + "SSL verify callback set without passing context"); + goto error; + } + + len = i2d_X509_AUX(ctx->cert, NULL); + enc_cert = PyBytes_FromStringAndSize(NULL, len); + if (!enc_cert) + goto error; + + p = PyBytes_AS_STRING(enc_cert); + if (len != i2d_X509_AUX(ctx->cert, &p)) { + /* certificate did not serialize to expected length */ + Py_CLEAR(enc_cert); + goto error; + } + + cb_result = PyObject_CallFunctionObjArgs( + ssl->verify_cert, + enc_cert, + NULL + ); + + Py_DECREF(enc_cert); + if (cb_result) { + r = 1; + Py_DECREF(cb_result); + } + +error: + PyGILState_Release(gstate); + return r; +} + +/*[clinic input] +_ssl._SSLContext.set_cert_verify_callback + callback: object = None + +Sets the function that determines whether a certificate is trusted. + +The function takes a single parameter, which is the ASN1-encoded copy +of the certificate. If the certificate is not trusted, the callback +should raise any error. When failure to validate the certificate +causes the overall validation process to fail, this error will be +chained with a *SSLError*. Otherwise, it will be silenced. + +Passing None for *callback* will revert to the default behavior. +[clinic start generated code]*/ + +static PyObject * +_ssl__SSLContext_set_cert_verify_callback_impl(PySSLContext *self, + PyObject *callback) +/*[clinic end generated code: output=2ff54bfb80dd82b1 input=d8899f18ab7bc325]*/ +{ + if (callback == Py_None) { + if (self->verify_cert) { + Py_CLEAR(self->verify_cert); + SSL_CTX_set_cert_verify_callback(self->ctx, NULL, NULL); + } + } else { + Py_XSETREF(self->verify_cert, callback); + Py_INCREF(self->verify_cert); + SSL_CTX_set_cert_verify_callback(self->ctx, _ssl_verify_cert, self); + } + Py_RETURN_NONE; +} + /*[clinic input] _ssl._SSLContext.get_ca_certs binary_form: bool = False @@ -4019,6 +4114,7 @@ _SSL__SSLCONTEXT_CERT_STORE_STATS_METHODDEF _SSL__SSLCONTEXT_GET_CA_CERTS_METHODDEF _SSL__SSLCONTEXT_GET_CIPHERS_METHODDEF + _SSL__SSLCONTEXT_SET_CERT_VERIFY_CALLBACK_METHODDEF {NULL, NULL} /* sentinel */ }; diff --git a/Modules/clinic/_ssl.c.h b/Modules/clinic/_ssl.c.h --- a/Modules/clinic/_ssl.c.h +++ b/Modules/clinic/_ssl.c.h @@ -687,6 +687,43 @@ return _ssl__SSLContext_cert_store_stats_impl(self); } +PyDoc_STRVAR(_ssl__SSLContext_set_cert_verify_callback__doc__, +"set_cert_verify_callback($self, /, callback=None)\n" +"--\n" +"\n" +"Sets the function that determines whether a certificate is trusted.\n" +"\n" +"The function takes a single parameter, which is the ASN1-encoded copy\n" +"of the certificate. If the certificate is not trusted, the callback\n" +"should raise any error. When failure to validate the certificate\n" +"causes the overall validation process to fail, this error will be\n" +"chained with a *SSLError*. Otherwise, it will be silenced."); + +#define _SSL__SSLCONTEXT_SET_CERT_VERIFY_CALLBACK_METHODDEF \ + {"set_cert_verify_callback", (PyCFunction)_ssl__SSLContext_set_cert_verify_callback, METH_FASTCALL, _ssl__SSLContext_set_cert_verify_callback__doc__}, + +static PyObject * +_ssl__SSLContext_set_cert_verify_callback_impl(PySSLContext *self, + PyObject *callback); + +static PyObject * +_ssl__SSLContext_set_cert_verify_callback(PySSLContext *self, PyObject **args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"callback", NULL}; + static _PyArg_Parser _parser = {"|O:set_cert_verify_callback", _keywords, 0}; + PyObject *callback = Py_None; + + if (!_PyArg_ParseStack(args, nargs, kwnames, &_parser, + &callback)) { + goto exit; + } + return_value = _ssl__SSLContext_set_cert_verify_callback_impl(self, callback); + +exit: + return return_value; +} + PyDoc_STRVAR(_ssl__SSLContext_get_ca_certs__doc__, "get_ca_certs($self, /, binary_form=False)\n" "--\n" @@ -1168,4 +1205,4 @@ #ifndef _SSL_ENUM_CRLS_METHODDEF #define _SSL_ENUM_CRLS_METHODDEF #endif /* !defined(_SSL_ENUM_CRLS_METHODDEF) */ -/*[clinic end generated code: output=a859b21fe68a6115 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=d00543b8d4031a84 input=a9049054013a1b77]*/