diff -r cb1691d42101 Doc/library/ssl.rst --- a/Doc/library/ssl.rst Wed Nov 20 17:43:23 2013 +0100 +++ b/Doc/library/ssl.rst Wed Nov 20 20:16:11 2013 +0100 @@ -821,6 +821,7 @@ .. versionadded:: 3.4 + .. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None) Load a private key and the corresponding certificate. The *certfile* @@ -851,7 +852,7 @@ .. versionchanged:: 3.3 New optional argument *password*. -.. method:: SSLContext.load_verify_locations(cafile=None, capath=None) +.. method:: SSLContext.load_verify_locations(cafile=None, capath=None, cadata=None) Load a set of "certification authority" (CA) certificates used to validate other peers' certificates when :data:`verify_mode` is other than @@ -867,6 +868,14 @@ following an `OpenSSL specific layout `_. + The *cadata* object, if present, is either an ASCII string of one or more + PEM-encoded certificates or a bytes-like object of DER-encoded + certificates. Like with *capath* extra lines around PEM-encoded + certificates are ignored but at least one certificate must be present. + + .. versionchanged:: 3.4 + New optional argument *cadata* + .. method:: SSLContext.get_ca_certs(binary_form=False) Get a list of loaded "certification authority" (CA) certificates. If the diff -r cb1691d42101 Lib/test/test_ssl.py --- a/Lib/test/test_ssl.py Wed Nov 20 17:43:23 2013 +0100 +++ b/Lib/test/test_ssl.py Wed Nov 20 20:16:11 2013 +0100 @@ -25,7 +25,8 @@ PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) HOST = support.HOST -data_file = lambda name: os.path.join(os.path.dirname(__file__), name) +def data_file(*name): + return os.path.join(os.path.dirname(__file__), *name) # The custom key and certificate files used in test_ssl are generated # using Lib/test/make_ssl_certs.py. @@ -43,6 +44,9 @@ KEY_PASSWORD = "somepass" CAPATH = data_file("capath") BYTES_CAPATH = os.fsencode(CAPATH) +CAFILE_NEURONIO = data_file("capath", "4e1295a3.0") +CAFILE_CACERT = data_file("capath", "5ed36f99.0") + # Two keys and certs signed by the same CA (for SNI tests) SIGNED_CERTFILE = data_file("keycert3.pem") @@ -726,7 +730,7 @@ ctx.load_verify_locations(BYTES_CERTFILE) ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) self.assertRaises(TypeError, ctx.load_verify_locations) - self.assertRaises(TypeError, ctx.load_verify_locations, None, None) + self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None) with self.assertRaises(OSError) as cm: ctx.load_verify_locations(WRONGCERT) self.assertEqual(cm.exception.errno, errno.ENOENT) @@ -738,6 +742,62 @@ # Issue #10989: crash if the second argument type is invalid self.assertRaises(TypeError, ctx.load_verify_locations, None, True) + # test cadata + with open(CAFILE_CACERT) as f: + cacert_pem = f.read() + cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem) + with open(CAFILE_NEURONIO) as f: + neuronio_pem = f.read() + neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem) + with open(CERTFILE) as f: + cert = f.read() + + # test PEM + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0) + ctx.load_verify_locations(cadata=cacert_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1) + ctx.load_verify_locations(cadata=neuronio_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + # cert already in hash table + ctx.load_verify_locations(cadata=neuronio_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # combined + combined = "\n".join((cacert_pem, neuronio_pem)) + ctx.load_verify_locations(cadata=combined) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # with junk around the certs + combined = ["head", cacert_pem, "other", neuronio_pem, "again", + neuronio_pem, "tail"] + ctx.load_verify_locations(cadata="\n".join(combined)) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # test DER + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_verify_locations(cadata=cacert_der) + ctx.load_verify_locations(cadata=neuronio_der) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + # cert already in hash table + ctx.load_verify_locations(cadata=cacert_der) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # combined + combined = b"".join((cacert_der, neuronio_der)) + ctx.load_verify_locations(cadata=combined) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # error cases + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object) + + with self.assertRaisesRegex(ssl.SSLError, "no start line"): + ctx.load_verify_locations(cadata="broken") + with self.assertRaisesRegex(ssl.SSLError, "not enough data"): + ctx.load_verify_locations(cadata=b"broken") + + def test_load_dh_params(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx.load_dh_params(DHFILE) @@ -1057,6 +1117,33 @@ finally: s.close() + def test_connect_add_ca_cert(self): + with open(CAFILE_CACERT) as f: + pem = f.read() + der = ssl.PEM_cert_to_DER_cert(pem) + with support.transient_internet("svn.python.org"): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.add_ca_cert(pem) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + # same with DER + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.add_ca_cert(der) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") def test_makefile_close(self): # Issue #5238: creating a file-like object with makefile() shouldn't diff -r cb1691d42101 Modules/_ssl.c --- a/Modules/_ssl.c Wed Nov 20 17:43:23 2013 +0100 +++ b/Modules/_ssl.c Wed Nov 20 20:16:11 2013 +0100 @@ -2304,60 +2304,202 @@ return NULL; } +/* internal helper function, returns -1 on error + */ +static int +_add_ca_certs(PySSLContext *self, void *data, Py_ssize_t len, + int filetype) +{ + BIO *biobuf = NULL; + X509_STORE *store; + int retval = 0, err, loaded = 0; + + assert(filetype == SSL_FILETYPE_ASN1 || filetype == SSL_FILETYPE_PEM); + + if (len <= 0) { + PyErr_SetString(PyExc_ValueError, + "Empty certificate data"); + return -1; + } else if (len > INT_MAX) { + PyErr_SetString(PyExc_OverflowError, + "Certificate data is too long."); + return -1; + } + + biobuf = BIO_new_mem_buf(data, len); + if (biobuf == NULL) { + _setSSLError("Can't allocate buffer", 0, __FILE__, __LINE__); + return -1; + } + + store = SSL_CTX_get_cert_store(self->ctx); + assert(store != NULL); + + while (1) { + X509 *cert = NULL; + int r; + + if (filetype == SSL_FILETYPE_ASN1) { + cert = d2i_X509_bio(biobuf, NULL); + } else { + cert = PEM_read_bio_X509(biobuf, NULL, + self->ctx->default_passwd_callback, + self->ctx->default_passwd_callback_userdata); + } + if (cert == NULL) { + break; + } + r = X509_STORE_add_cert(store, cert); + X509_free(cert); + if (!r) { + err = ERR_peek_last_error(); + if ((ERR_GET_LIB(err) == ERR_LIB_X509) && + (ERR_GET_REASON(err) == X509_R_CERT_ALREADY_IN_HASH_TABLE)) { + /* cert already in hash table, not an error */ + ERR_clear_error(); + } else { + break; + } + } + loaded++; + } + + err = ERR_peek_last_error(); + if ((filetype == SSL_FILETYPE_ASN1) && + (loaded > 0) && + (ERR_GET_LIB(err) == ERR_LIB_ASN1) && + (ERR_GET_REASON(err) == ASN1_R_HEADER_TOO_LONG)) { + /* EOF ASN1 file, not an error */ + ERR_clear_error(); + retval = 0; + } else if ((filetype == SSL_FILETYPE_PEM) && + (loaded > 0) && + (ERR_GET_LIB(err) == ERR_LIB_PEM) && + (ERR_GET_REASON(err) == PEM_R_NO_START_LINE)) { + /* EOF PEM file, not an error */ + ERR_clear_error(); + retval = 0; + } else { + _setSSLError(NULL, 0, __FILE__, __LINE__); + retval = -1; + } + + if (biobuf != NULL){ + BIO_free(biobuf); + } + return retval; +} + + static PyObject * load_verify_locations(PySSLContext *self, PyObject *args, PyObject *kwds) { - char *kwlist[] = {"cafile", "capath", NULL}; - PyObject *cafile = NULL, *capath = NULL; + char *kwlist[] = {"cafile", "capath", "cadata", NULL}; + PyObject *cafile = NULL, *capath = NULL, *cadata = NULL; PyObject *cafile_bytes = NULL, *capath_bytes = NULL; const char *cafile_buf = NULL, *capath_buf = NULL; - int r; + int r = 0, ok = 1; errno = 0; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "|OO:load_verify_locations", kwlist, - &cafile, &capath)) + "|OOO:load_verify_locations", kwlist, + &cafile, &capath, &cadata)) return NULL; + if (cafile == Py_None) cafile = NULL; if (capath == Py_None) capath = NULL; - if (cafile == NULL && capath == NULL) { + if (cadata == Py_None) + cadata = NULL; + + if (cafile == NULL && capath == NULL && cadata == NULL) { PyErr_SetString(PyExc_TypeError, - "cafile and capath cannot be both omitted"); - return NULL; + "cafile, capath and cadata cannot be all omitted"); + goto error; } if (cafile && !PyUnicode_FSConverter(cafile, &cafile_bytes)) { PyErr_SetString(PyExc_TypeError, "cafile should be a valid filesystem path"); + goto error; + } + if (capath && !PyUnicode_FSConverter(capath, &capath_bytes)) { + PyErr_SetString(PyExc_TypeError, + "capath should be a valid filesystem path"); + goto error; + } + + /* validata cadata type and load cadata */ + if (cadata) { + Py_buffer buf; + PyObject *cadata_ascii = NULL; + + if (PyObject_GetBuffer(cadata, &buf, PyBUF_SIMPLE) == 0) { + if (!PyBuffer_IsContiguous(&buf, 'C') || buf.ndim > 1) { + PyBuffer_Release(&buf); + PyErr_SetString(PyExc_TypeError, + "cadata should be a contiguous buffer with " + "a single dimension"); + goto error; + } + r = _add_ca_certs(self, buf.buf, buf.len, SSL_FILETYPE_ASN1); + PyBuffer_Release(&buf); + if (r == -1) { + goto error; + } + } else { + PyErr_Clear(); + cadata_ascii = PyUnicode_AsASCIIString(cadata); + if (cadata_ascii == NULL) { + PyErr_SetString(PyExc_TypeError, + "cadata should be a ASCII string or a " + "bytes-like object"); + goto error; + } + r = _add_ca_certs(self, + PyBytes_AS_STRING(cadata_ascii), + PyBytes_GET_SIZE(cadata_ascii), + SSL_FILETYPE_PEM); + Py_DECREF(cadata_ascii); + if (r == -1) { + goto error; + } + } + } + + /* load cafile or capath */ + if (cafile || capath) { + if (cafile) + cafile_buf = PyBytes_AS_STRING(cafile_bytes); + if (capath) + capath_buf = PyBytes_AS_STRING(capath_bytes); + PySSL_BEGIN_ALLOW_THREADS + r = SSL_CTX_load_verify_locations(self->ctx, cafile_buf, capath_buf); + PySSL_END_ALLOW_THREADS + if (r != 1) { + ok = 0; + if (errno != 0) { + ERR_clear_error(); + PyErr_SetFromErrno(PyExc_IOError); + } + else { + _setSSLError(NULL, 0, __FILE__, __LINE__); + } + goto error; + } + } + goto end; + + error: + ok = 0; + end: + Py_XDECREF(cafile_bytes); + Py_XDECREF(capath_bytes); + if (ok) { + Py_RETURN_NONE; + } else { return NULL; } - if (capath && !PyUnicode_FSConverter(capath, &capath_bytes)) { - Py_XDECREF(cafile_bytes); - PyErr_SetString(PyExc_TypeError, - "capath should be a valid filesystem path"); - return NULL; - } - if (cafile) - cafile_buf = PyBytes_AS_STRING(cafile_bytes); - if (capath) - capath_buf = PyBytes_AS_STRING(capath_bytes); - PySSL_BEGIN_ALLOW_THREADS - r = SSL_CTX_load_verify_locations(self->ctx, cafile_buf, capath_buf); - PySSL_END_ALLOW_THREADS - Py_XDECREF(cafile_bytes); - Py_XDECREF(capath_bytes); - if (r != 1) { - if (errno != 0) { - ERR_clear_error(); - PyErr_SetFromErrno(PyExc_IOError); - } - else { - _setSSLError(NULL, 0, __FILE__, __LINE__); - } - return NULL; - } - Py_RETURN_NONE; } static PyObject *