diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -24,7 +24,8 @@ PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) HOST = support.HOST -data_file = lambda name: os.path.join(os.path.dirname(__file__), name) +def data_file(*name): + return os.path.join(os.path.dirname(__file__), *name) # The custom key and certificate files used in test_ssl are generated # using Lib/test/make_ssl_certs.py. @@ -42,6 +43,9 @@ KEY_PASSWORD = "somepass" CAPATH = data_file("capath") BYTES_CAPATH = os.fsencode(CAPATH) +CAFILE_NEURONIO = data_file("capath", "4e1295a3.0") +CAFILE_CACERT = data_file("capath", "5ed36f99.0") + # Two keys and certs signed by the same CA (for SNI tests) SIGNED_CERTFILE = data_file("keycert3.pem") @@ -630,6 +634,47 @@ gc.collect() self.assertIs(wr(), None) + def test_add_cert(self): + with open(CAFILE_CACERT) as f: + cacert_pem = f.read() + cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem) + with open(CAFILE_NEURONIO) as f: + neuronio_pem = f.read() + neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem) + + # test PEM + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.get_cert_count(), 0) + ctx.add_cert(cacert_pem) + self.assertEqual(ctx.get_cert_count(), 1) + ctx.add_cert(neuronio_pem) + self.assertEqual(ctx.get_cert_count(), 2) + + with self.assertRaisesRegex(ssl.SSLError, + "cert already in hash table"): + ctx.add_cert(cacert_pem) + self.assertEqual(ctx.get_cert_count(), 2) + + # test DER + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.add_cert(cacert_der) + ctx.add_cert(neuronio_der) + self.assertEqual(ctx.get_cert_count(), 2) + + with self.assertRaisesRegex(ssl.SSLError, + "cert already in hash table"): + ctx.add_cert(cacert_pem) + + # error cases + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(TypeError, ctx.add_cert, None) + self.assertRaises(TypeError, ctx.add_cert, object) + + with self.assertRaisesRegex(ssl.SSLError, "no start line"): + ctx.add_cert("broken") + with self.assertRaisesRegex(ssl.SSLError, "not enough data"): + ctx.add_cert(b"broken") + class SSLErrorTests(unittest.TestCase): @@ -833,6 +878,33 @@ finally: s.close() + def test_connect_add_cert(self): + with open(CAFILE_CACERT) as f: + pem = f.read() + der = ssl.PEM_cert_to_DER_cert(pem) + with support.transient_internet("svn.python.org"): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.add_cert(pem) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + # same with DER + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.add_cert(der) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") def test_makefile_close(self): # Issue #5238: creating a file-like object with makefile() shouldn't diff --git a/Modules/_ssl.c b/Modules/_ssl.c --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -2270,6 +2270,153 @@ Py_RETURN_NONE; } +/* internal helper function */ + +static int +_add_cert(PySSLContext *self, const void *data, Py_ssize_t len, + int filetype) +{ + BIO *biobuf = NULL; + X509 *cert = NULL; + X509_STORE *store; + + int success = 0; + + if (len <= 0) { + PyErr_SetString(PyExc_ValueError, + "Empty certificate data"); + return 0; + } else if (len > INT_MAX) { + PyErr_SetString(PyExc_ValueError, + "Certificate data is too long."); + return 0; + } + + if ((biobuf = BIO_new(BIO_s_mem())) == NULL) { + ERR_clear_error(); + PyErr_SetString(PySSLErrorObject, + "Can't malloc memory"); + goto end; + } + if (BIO_write(biobuf, data, (int)len) <= 0) { + ERR_clear_error(); + PyErr_SetString(PySSLErrorObject, + "Can't write buffer to bio"); + goto end; + } + + if (filetype == SSL_FILETYPE_ASN1) { + cert = d2i_X509_bio(biobuf, NULL); + } else if (filetype == SSL_FILETYPE_PEM) { + cert = PEM_read_bio_X509(biobuf, NULL, + self->ctx->default_passwd_callback, + self->ctx->default_passwd_callback_userdata); + } else { + /* should never be reached */ + PyErr_SetString(PyExc_ValueError, + "invalid cert file type"); + goto end; + } + + if (cert == NULL) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto end; + } + + store = SSL_CTX_get_cert_store(self->ctx); + assert(store != NULL); + + if (!X509_STORE_add_cert(store, cert)) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto end; + } + + success = 1; + + end: + if (biobuf != NULL){ + BIO_free(biobuf); + } + if (cert != NULL) { + X509_free(cert); + } + return success; +} + +PyDoc_STRVAR(PySSL_add_cert_doc, +"add_cert(certdata)\n\ +\n\ +Add an extra CA certificate. certdata must either be a single ASCII encoded\n\ +PEM certificate or a single DER certificate as a bytes or buffer object."); + +static PyObject* +add_cert(PySSLContext *self, PyObject *cacert) +{ + int result = 0; + + /* unicode ASCII data: PEM format */ + if (PyUnicode_Check(cacert)) { + const void *data; + Py_ssize_t len; + + if ((PyUnicode_READY(cacert) < 0) || !PyUnicode_IS_ASCII(cacert)) { + PyErr_SetString(PyExc_ValueError, + "PEM cert should contain only ASCII characters"); + return NULL; + } + + assert(PyUnicode_KIND(cacert) == PyUnicode_1BYTE_KIND); + data = (const void*)PyUnicode_1BYTE_DATA(cacert); + len = PyUnicode_GET_LENGTH(cacert); + + result =_add_cert(self, data, len, SSL_FILETYPE_PEM); + if (result) { + Py_RETURN_NONE; + } else { + return NULL; + } + } + else if (!PyObject_CheckBuffer(cacert)) { + PyErr_SetString(PyExc_TypeError, + "Either ASCII unicode or an object supporting the " + "buffer API is required."); + return NULL; + } else { + /* buffer: DER format */ + Py_buffer buf; + + if (PyObject_GetBuffer(cacert, &buf, PyBUF_SIMPLE) == -1) { + return NULL; + } + if ((buf.ndim > 1) || !PyBuffer_IsContiguous(&buf, 'C')) { + PyErr_SetString(PyExc_BufferError, + "Buffer must be single dimension and contiguous"); + PyBuffer_Release(&buf); + return NULL; + } + + result = _add_cert(self, buf.buf, buf.len, SSL_FILETYPE_ASN1); + PyBuffer_Release(&buf); + if (result) { + Py_RETURN_NONE; + } else { + return NULL; + } + } +} + +static PyObject * +get_cert_count(PySSLContext *self) +{ + X509_STORE *store; + int len = 0; + + store = SSL_CTX_get_cert_store(self->ctx); + len = sk_X509_OBJECT_num(store->objs); + + return PyLong_FromLong(len); +} + static PyObject * load_dh_params(PySSLContext *self, PyObject *filepath) { @@ -2586,6 +2733,10 @@ #endif {"set_servername_callback", (PyCFunction) set_servername_callback, METH_VARARGS, PySSL_set_servername_callback_doc}, + {"add_cert", (PyCFunction) add_cert, + METH_O, PySSL_add_cert_doc}, + {"get_cert_count", (PyCFunction) get_cert_count, + METH_NOARGS, NULL}, {NULL, NULL} /* sentinel */ };