diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst --- a/Doc/library/ssl.rst +++ b/Doc/library/ssl.rst @@ -791,6 +791,14 @@ :class:`SSLContext` objects have the following methods and attributes: +.. method:: SSLContext.add_ca_cert(certdata) + + Add a single CA certificate into the SSLContext's internal cert store. + *certdata* can either be a PEM certificate as ASCII string or a DER + certificate as bytes-like object. + + .. versionadded:: 3.4 + .. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None) Load a private key and the corresponding certificate. The *certfile* diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -24,7 +24,8 @@ PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) HOST = support.HOST -data_file = lambda name: os.path.join(os.path.dirname(__file__), name) +def data_file(*name): + return os.path.join(os.path.dirname(__file__), *name) # The custom key and certificate files used in test_ssl are generated # using Lib/test/make_ssl_certs.py. @@ -42,6 +43,9 @@ KEY_PASSWORD = "somepass" CAPATH = data_file("capath") BYTES_CAPATH = os.fsencode(CAPATH) +CAFILE_NEURONIO = data_file("capath", "4e1295a3.0") +CAFILE_CACERT = data_file("capath", "5ed36f99.0") + # Two keys and certs signed by the same CA (for SNI tests) SIGNED_CERTFILE = data_file("keycert3.pem") @@ -666,6 +670,59 @@ gc.collect() self.assertIs(wr(), None) + def test_add_ca_cert(self): + with open(CAFILE_CACERT) as f: + cacert_pem = f.read() + cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem) + with open(CAFILE_NEURONIO) as f: + neuronio_pem = f.read() + neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem) + with open(CERTFILE) as f: + cert = f.read() + + # test PEM + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + #self.assertEqual(ctx.get_cert_count(), 0) + ctx.add_ca_cert(cacert_pem) + #self.assertEqual(ctx.get_cert_count(), 1) + ctx.add_ca_cert(neuronio_pem) + #self.assertEqual(ctx.get_cert_count(), 2) + + with self.assertRaisesRegex(ssl.SSLError, + "cert already in hash table"): + ctx.add_ca_cert(cacert_pem) + #self.assertEqual(ctx.get_cert_count(), 2) + + # test DER + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.add_ca_cert(cacert_der) + ctx.add_ca_cert(neuronio_der) + #self.assertEqual(ctx.get_cert_count(), 2) + + with self.assertRaisesRegex(ssl.SSLError, + "cert already in hash table"): + ctx.add_ca_cert(cacert_pem) + + # error cases + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(TypeError, ctx.add_ca_cert, None) + self.assertRaises(TypeError, ctx.add_ca_cert, object) + + with self.assertRaisesRegex(ssl.SSLError, "no start line"): + ctx.add_ca_cert("broken") + with self.assertRaisesRegex(ssl.SSLError, "not enough data"): + ctx.add_ca_cert(b"broken") + with self.assertRaisesRegex(ssl.SSLError, "Certificate is not a CA cert."): + ctx.add_ca_cert(cert) + + combined = "\n".join((cacert_pem, neuronio_pem)) + with self.assertRaisesRegex(ssl.SSLError, "Extra data"): + ctx.add_ca_cert(combined) + combined = b"\n".join((cacert_der, neuronio_der)) + with self.assertRaisesRegex(ssl.SSLError, "Extra data"): + ctx.add_ca_cert(combined) + + class SSLErrorTests(unittest.TestCase): @@ -869,6 +926,33 @@ finally: s.close() + def test_connect_add_ca_cert(self): + with open(CAFILE_CACERT) as f: + pem = f.read() + der = ssl.PEM_cert_to_DER_cert(pem) + with support.transient_internet("svn.python.org"): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.add_ca_cert(pem) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + # same with DER + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.add_ca_cert(der) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") def test_makefile_close(self): # Issue #5238: creating a file-like object with makefile() shouldn't diff --git a/Modules/_ssl.c b/Modules/_ssl.c --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -2270,6 +2270,158 @@ Py_RETURN_NONE; } +/* internal helper function */ +static int +_add_ca_cert(PySSLContext *self, const void *data, Py_ssize_t len, + int filetype) +{ + BIO *biobuf = NULL; + X509 *cert = NULL; + X509_STORE *store; + char buf[2]; + + int success = 0, result; + + if (len <= 0) { + PyErr_SetString(PyExc_ValueError, + "Empty certificate data"); + return 0; + } else if (len > INT_MAX) { + PyErr_SetString(PyExc_ValueError, + "Certificate data is too long."); + return 0; + } + + if ((biobuf = BIO_new(BIO_s_mem())) == NULL) { + ERR_clear_error(); + PyErr_SetString(PySSLErrorObject, + "Can't malloc memory"); + goto end; + } + if (BIO_write(biobuf, data, (int)len) <= 0) { + ERR_clear_error(); + PyErr_SetString(PySSLErrorObject, + "Can't write buffer to bio"); + goto end; + } + + if (filetype == SSL_FILETYPE_ASN1) { + cert = d2i_X509_bio(biobuf, NULL); + } else if (filetype == SSL_FILETYPE_PEM) { + cert = PEM_read_bio_X509(biobuf, NULL, + self->ctx->default_passwd_callback, + self->ctx->default_passwd_callback_userdata); + } else { + /* should never be reached */ + PyErr_SetString(PyExc_ValueError, + "invalid cert file type"); + goto end; + } + + if (cert == NULL) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto end; + } + + /* Check if buffer still contains some data */ + result = BIO_read(biobuf, buf, sizeof(buf)-1); + if (result > 0) { + PyErr_SetString(PySSLErrorObject, + "Extra data after certificate"); + goto end; + } else if (result < -2) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto end; + } + + if(!X509_check_ca(cert)) { + PyErr_SetString(PySSLErrorObject, + "Certificate is not a CA cert."); + goto end; + } + + store = SSL_CTX_get_cert_store(self->ctx); + assert(store != NULL); + + if (!X509_STORE_add_cert(store, cert)) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto end; + } + + success = 1; + + end: + if (biobuf != NULL){ + BIO_free(biobuf); + } + if (cert != NULL) { + X509_free(cert); + } + return success; +} + +PyDoc_STRVAR(PySSL_add_ca_cert_doc, +"add_ca_cert(certdata)\n\ +\n\ +Add an extra CA certificate. certdata must either be a single ASCII-encoded\n\ +PEM certificate or a single DER certificate as a bytes-like object."); + +static PyObject* +add_ca_cert(PySSLContext *self, PyObject *cacert) +{ + int result = 0; + + /* unicode ASCII data: PEM format */ + if (PyUnicode_Check(cacert)) { + const void *data; + Py_ssize_t len; + + if ((PyUnicode_READY(cacert) < 0) || !PyUnicode_IS_ASCII(cacert)) { + PyErr_SetString(PyExc_ValueError, + "PEM cert should contain only ASCII characters"); + return NULL; + } + + assert(PyUnicode_KIND(cacert) == PyUnicode_1BYTE_KIND); + data = (const void*)PyUnicode_1BYTE_DATA(cacert); + len = PyUnicode_GET_LENGTH(cacert); + + result =_add_ca_cert(self, data, len, SSL_FILETYPE_PEM); + if (result) { + Py_RETURN_NONE; + } else { + return NULL; + } + } + else if (!PyObject_CheckBuffer(cacert)) { + PyErr_Format(PyExc_TypeError, + "Expected an ASCII-only string or bytes-like object, " + "not '%100s'.", Py_TYPE(cacert)->tp_name); + return NULL; + } else { + /* buffer: DER format */ + Py_buffer buf; + + if (PyObject_GetBuffer(cacert, &buf, PyBUF_SIMPLE) == -1) { + return NULL; + } + if (buf.ndim > 1) { + PyErr_SetString(PyExc_BufferError, + "Buffer must be single dimension"); + PyBuffer_Release(&buf); + return NULL; + } + + result = _add_ca_cert(self, buf.buf, buf.len, SSL_FILETYPE_ASN1); + PyBuffer_Release(&buf); + if (result) { + Py_RETURN_NONE; + } else { + return NULL; + } + } +} + static PyObject * load_dh_params(PySSLContext *self, PyObject *filepath) { @@ -2586,6 +2738,8 @@ #endif {"set_servername_callback", (PyCFunction) set_servername_callback, METH_VARARGS, PySSL_set_servername_callback_doc}, + {"add_ca_cert", (PyCFunction) add_ca_cert, + METH_O, PySSL_add_ca_cert_doc}, {NULL, NULL} /* sentinel */ };