diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst --- a/Doc/library/ssl.rst +++ b/Doc/library/ssl.rst @@ -804,6 +804,14 @@ .. versionadded:: 3.4 +.. method:: SSLContext.add_ca_cert(certdata) + + Add a single CA certificate into the SSLContext's internal cert store. + *certdata* can either be a PEM certificate as ASCII string or a DER + certificate as bytes-like object. + + .. versionadded:: 3.4 + .. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None) Load a private key and the corresponding certificate. The *certfile* diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -25,7 +25,8 @@ PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) HOST = support.HOST -data_file = lambda name: os.path.join(os.path.dirname(__file__), name) +def data_file(*name): + return os.path.join(os.path.dirname(__file__), *name) # The custom key and certificate files used in test_ssl are generated # using Lib/test/make_ssl_certs.py. @@ -43,6 +44,9 @@ KEY_PASSWORD = "somepass" CAPATH = data_file("capath") BYTES_CAPATH = os.fsencode(CAPATH) +CAFILE_NEURONIO = data_file("capath", "4e1295a3.0") +CAFILE_CACERT = data_file("capath", "5ed36f99.0") + # Two keys and certs signed by the same CA (for SNI tests) SIGNED_CERTFILE = data_file("keycert3.pem") @@ -721,6 +725,59 @@ der = ssl.PEM_cert_to_DER_cert(pem) self.assertEqual(ctx.get_ca_certs(True), [der]) + def test_add_ca_cert(self): + with open(CAFILE_CACERT) as f: + cacert_pem = f.read() + cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem) + with open(CAFILE_NEURONIO) as f: + neuronio_pem = f.read() + neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem) + with open(CERTFILE) as f: + cert = f.read() + + # test PEM + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0) + ctx.add_ca_cert(cacert_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1) + ctx.add_ca_cert(neuronio_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + with self.assertRaisesRegex(ssl.SSLError, + "cert already in hash table"): + ctx.add_ca_cert(cacert_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # test DER + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.add_ca_cert(cacert_der) + ctx.add_ca_cert(neuronio_der) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + with self.assertRaisesRegex(ssl.SSLError, + "cert already in hash table"): + ctx.add_ca_cert(cacert_pem) + + # error cases + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(TypeError, ctx.add_ca_cert, None) + self.assertRaises(TypeError, ctx.add_ca_cert, object) + + with self.assertRaisesRegex(ssl.SSLError, "no start line"): + ctx.add_ca_cert("broken") + with self.assertRaisesRegex(ssl.SSLError, "not enough data"): + ctx.add_ca_cert(b"broken") + with self.assertRaisesRegex(ssl.SSLError, "Certificate is not a CA cert."): + ctx.add_ca_cert(cert) + + combined = "\n".join((cacert_pem, neuronio_pem)) + with self.assertRaisesRegex(ssl.SSLError, "Extra data"): + ctx.add_ca_cert(combined) + combined = b"\n".join((cacert_der, neuronio_der)) + with self.assertRaisesRegex(ssl.SSLError, "Extra data"): + ctx.add_ca_cert(combined) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0) + class SSLErrorTests(unittest.TestCase): @@ -924,6 +981,33 @@ finally: s.close() + def test_connect_add_ca_cert(self): + with open(CAFILE_CACERT) as f: + pem = f.read() + der = ssl.PEM_cert_to_DER_cert(pem) + with support.transient_internet("svn.python.org"): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.add_ca_cert(pem) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + # same with DER + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.add_ca_cert(der) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") def test_makefile_close(self): # Issue #5238: creating a file-like object with makefile() shouldn't diff --git a/Modules/_ssl.c b/Modules/_ssl.c --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -2272,6 +2272,142 @@ Py_RETURN_NONE; } +/* internal helper function, returns 0 on success, -1 on error */ +static int +_add_ca_cert(PySSLContext *self, const void *data, Py_ssize_t len, + int filetype, int check_trailing_data) +{ + BIO *biobuf = NULL; + X509 *cert = NULL; + X509_STORE *store; + char buf[2]; + + int retval = -1, result; + + if (len <= 0) { + PyErr_SetString(PyExc_ValueError, + "Empty certificate data"); + goto end; + } else if (len > INT_MAX) { + PyErr_SetString(PyExc_OverflowError, + "Certificate data is too long."); + goto end; + } + + if ((biobuf = BIO_new(BIO_s_mem())) == NULL) { + _setSSLError("Can't malloc memory", 0, __FILE__, __LINE__); + goto end; + } + if (BIO_write(biobuf, data, (int)len) <= 0) { + _setSSLError("Can't write buffer to bio", 0, __FILE__, __LINE__); + goto end; + } + + if (filetype == SSL_FILETYPE_ASN1) { + cert = d2i_X509_bio(biobuf, NULL); + } else if (filetype == SSL_FILETYPE_PEM) { + cert = PEM_read_bio_X509(biobuf, NULL, + self->ctx->default_passwd_callback, + self->ctx->default_passwd_callback_userdata); + } else { + assert(0); /* should never be reached */ + } + + if (cert == NULL) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto end; + } + + /* Check if buffer still contains some data */ + if (check_trailing_data) { + result = BIO_read(biobuf, buf, sizeof(buf)-1); + if (result > 0) { + PyErr_SetString(PySSLErrorObject, + "Extra data after certificate"); + goto end; + } else if (result <= -2) { + /* Returns 0 or -1 if no data is left or on non-fatal error */ + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto end; + } + } + + if(!X509_check_ca(cert)) { + PyErr_SetString(PySSLErrorObject, + "Certificate is not a CA cert."); + goto end; + } + + store = SSL_CTX_get_cert_store(self->ctx); + assert(store != NULL); + + if (!X509_STORE_add_cert(store, cert)) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto end; + } + /* success */ + retval = 0; + + end: + if (biobuf != NULL){ + BIO_free(biobuf); + } + if (cert != NULL) { + X509_free(cert); + } + return retval; +} + +PyDoc_STRVAR(PySSL_add_ca_cert_doc, +"add_ca_cert(certdata, check_trailing_data=True)\n\ +\n\ +Add an extra CA certificate. certdata must either be a single ASCII-encoded\n\ +PEM certificate or a single DER certificate as a bytes-like object."); + +static PyObject* +add_ca_cert(PySSLContext *self, PyObject *args, PyObject *kwds) +{ + char *kwlist[] = {"cacert", "check_trailing_data", NULL}; + Py_buffer buf; + char *certbuf = NULL; + int len = -1; + int check_trailing_data = 1; + int result = -1; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "y*|p:add_ca_cert", kwlist, + &buf, &check_trailing_data)) { + PyErr_Clear(); + if (!PyArg_ParseTupleAndKeywords(args, kwds, "es#|p:add_ca_cert", + kwlist, "ascii", &certbuf, &len, + &check_trailing_data)) { + return NULL; + } + } + + if (certbuf != NULL) { + /* unicode ASCII data: PEM format */ + result = _add_ca_cert(self, certbuf, len, SSL_FILETYPE_PEM, + check_trailing_data); + PyMem_Free(certbuf); + } else { + /* Got a bytes-like object, the cert is in DER format */ + if (buf.ndim > 1) { + PyErr_SetString(PyExc_BufferError, + "Buffer must be single dimension"); + PyBuffer_Release(&buf); + return NULL; + } + result = _add_ca_cert(self, buf.buf, buf.len, SSL_FILETYPE_ASN1, + check_trailing_data); + PyBuffer_Release(&buf); + } + if (result == -1) { + return NULL; + } else { + Py_RETURN_NONE; + } +} + static PyObject * load_dh_params(PySSLContext *self, PyObject *filepath) { @@ -2696,6 +2832,8 @@ METH_NOARGS, PySSL_get_stats_doc}, {"get_ca_certs", (PyCFunction) get_ca_certs, METH_VARARGS, PySSL_get_ca_certs_doc}, + {"add_ca_cert", (PyCFunction) add_ca_cert, + METH_VARARGS | METH_KEYWORDS, PySSL_add_ca_cert_doc}, {NULL, NULL} /* sentinel */ };