diff --git a/Modules/_ssl.c b/Modules/_ssl.c --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -460,6 +460,426 @@ } /* + * X509Cert object + */ + +static PyObject * _decode_certificate(X509 *); +static PyObject * _certificate_to_der(X509 *); +static PyObject * _create_tuple_for_X509_NAME (X509_NAME *); + +typedef struct { + PyObject_HEAD + X509 *cert; + Py_hash_t hash; /* subject name hash */ + int verified; + int ca_cert; +} PyX509Cert; + +static PyTypeObject PyX509Cert_Type; + +#define PyX509Cert_Check(v) (Py_TYPE(v) == &PyX509Cert_Type) + +#define PyX509Cert_CheckReturnTypeError(v) \ + do if (!PyX509Cert_Check(v)) { \ + PyErr_Format(PyExc_TypeError, \ + "The argument must be a X509cert, not '%.200s'", \ + Py_TYPE(v)->tp_name); \ + return NULL; \ + } while(0) + +#define PyX509Cert_Cert(v) \ + (assert(PyX509Cert_Check(v)), \ + ((PyX509Cert*)(v))->cert) + +static PyX509Cert* +newPyX509Cert(X509* cert, int verified, int x509dup) +{ + PyX509Cert *self; + + if ((self = PyObject_New(PyX509Cert, &PyX509Cert_Type)) == NULL) { + return NULL; + } + if (x509dup) { + if ((cert = X509_dup(cert)) == NULL) { + Py_DECREF(self); + _setSSLError(NULL, 0, __FILE__, __LINE__); + return NULL; + } + } + self->cert = cert; + self->hash = -1; + self->verified = verified; + /* used in several functions, call fills X509v3 ext cache, too */ + self->ca_cert = X509_check_ca(self->cert); + + return self; +} + +static PyObject* +pyx509cert_get_info(PyX509Cert *self) +{ + return _decode_certificate(PyX509Cert_Cert(self)); +} + +static PyObject* +pyx509cert_get_der(PyX509Cert *self) +{ + return _certificate_to_der(PyX509Cert_Cert(self)); +} + +static PyObject* +pyx509cert_raw_print(PyX509Cert *self) +{ + BIO *biobuf = NULL; + char *buf = NULL; + PyObject *result = NULL; + /* a typical output has about 3500 to 4200 chars */ + int bufsize = 8192; + int len; + + /* get a memory buffer */ + if ((biobuf = BIO_new(BIO_s_mem())) == NULL) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto fail; + } + if (!X509_print_ex(biobuf, PyX509Cert_Cert(self), XN_FLAG_COMPAT, 0)) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto fail; + } + if ((buf = PyMem_Malloc(bufsize)) == NULL) { + PyErr_NoMemory(); + goto fail; + } + len = BIO_read(biobuf, buf, bufsize-1); + if (len < 0) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto fail; + } + result = PyUnicode_FromStringAndSize(buf, len); + + fail: /* fall through */ + if (biobuf != NULL) { + BIO_free(biobuf); + } + if (buf != NULL) { + PyMem_Free(buf); + } + return result; +} + +static PyObject* +_pyx509cert_check_issued(X509 *issuer, X509 *subject) +{ + long ret; + const char *msg; + PyObject *result = NULL; + + ret = X509_check_issued(issuer, subject); + msg = X509_verify_cert_error_string(ret); + if (ret == X509_V_ERR_OUT_OF_MEM) { + PyErr_SetString(PyExc_MemoryError, msg); + return NULL; + } + result = (ret == X509_V_OK) ? Py_True : Py_False; + return Py_BuildValue("Os", result, msg); +} + +static PyObject* +pyx509cert_is_issuer_of(PyX509Cert *self, PyObject *other) +{ + PyX509Cert_CheckReturnTypeError(other); + return _pyx509cert_check_issued(PyX509Cert_Cert(self), + PyX509Cert_Cert(other)); +} + +static PyObject* +pyx509cert_is_issued_by(PyX509Cert *self, PyObject *other) +{ + PyX509Cert_CheckReturnTypeError(other); + return _pyx509cert_check_issued(PyX509Cert_Cert(other), + PyX509Cert_Cert(self)); +} + +static PyObject* +pyx509cert_is_selfsigned(PyX509Cert *self) +{ + return _pyx509cert_check_issued(PyX509Cert_Cert(self), + PyX509Cert_Cert(self)); +} + +static PyObject* +pyx509cert_get_purposes(PyX509Cert *self, PyObject *args, PyObject *kwds) +{ + char *kwlist[] = {"ca", "shortnames", NULL}; + PyObject *purpset = NULL, *oname = NULL; + X509 *cert; + X509_PURPOSE *purp; + const char *pname; + int i, pid, result; + int ca = self->ca_cert ? 1 : 0; + int shortnames = 0; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|pp:get_purposes", kwlist, + &ca, &shortnames)) { + return NULL; + } + + cert = PyX509Cert_Cert(self); + if ((purpset = PySet_New(NULL)) == NULL) { + return NULL; + } + + for (i = 0; i < X509_PURPOSE_get_count(); i++) { + purp = X509_PURPOSE_get0(i); + pid = X509_PURPOSE_get_id(purp); + if (X509_check_purpose(cert, pid, ca) == 0) { + continue; + } + if (shortnames) { + pname = X509_PURPOSE_get0_sname(purp); + } else { + pname = X509_PURPOSE_get0_name(purp); + } + if ((oname = PyUnicode_FromString(pname)) == NULL) { + goto error; + } + result = PySet_Add(purpset, oname); + Py_DECREF(oname); + if (result == -1) { + goto error; + } + } + return purpset; + + error: + if (purpset != NULL) { + Py_DECREF(purpset); + } + return NULL; +} + +static void +pyx509cert_dealloc(PyX509Cert *self) +{ + if (self->cert) { + X509_free(self->cert); + self->cert = NULL; + } + PyObject_Del(self); +} + +static Py_hash_t +pyx509cert_hash(PyX509Cert *self) +{ + if (self->hash == -1) { + unsigned long hash; + hash = X509_subject_name_hash(PyX509Cert_Cert(self)); + if (hash == (unsigned long)-1) { + self->hash = -2; + } else { + self->hash = (Py_hash_t)hash; + } + } + return self->hash; +} +static PyObject * +pyx509cert_richcompare(PyObject *self, PyObject *other, int op) +{ + int cmp; + + if (!PyX509Cert_Check(other)) { + Py_RETURN_NOTIMPLEMENTED; + } + /* only support == and != */ + if ((op != Py_EQ) && (op != Py_NE)) { + Py_RETURN_NOTIMPLEMENTED; + } + cmp = X509_cmp(PyX509Cert_Cert(self), PyX509Cert_Cert(other)); + if (((op == Py_EQ) && (cmp == 0)) || ((op == Py_NE) && (cmp != 0))) { + Py_RETURN_TRUE; + } else { + Py_RETURN_FALSE; + } +} + +static PyObject* +pyx509cert_get_verified(PyX509Cert *self, void *c) +{ + return PyBool_FromLong(self->verified); +} + +static PyObject* +pyx509cert_get_ca_cert(PyX509Cert *self, void *c) +{ + /* 0: no CA, 1: CA, >1: maybe a CA cert */ + return PyBool_FromLong(self->ca_cert); +} + +static PyObject* +_x509name_oneline(X509_NAME *name) +{ + char *p = NULL; + PyObject *result = NULL; + + if ((p = X509_NAME_oneline(name, NULL, 0)) == NULL) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + return NULL; + } + result = PyUnicode_FromString(p); + OPENSSL_free(p); + return result; +} + +static PyObject* +pyx509cert_get_issuer_name(PyX509Cert *self, PyObject *args) +{ + int oneline = 0; + X509_NAME *name; + if (!PyArg_ParseTuple(args, "|p:get_issuer_name", &oneline)) { + return NULL; + } + name = X509_get_issuer_name(PyX509Cert_Cert(self)); + if (oneline) { + return _x509name_oneline(name); + } else { + return _create_tuple_for_X509_NAME(name); + } +} + +static PyObject* +pyx509cert_get_subject_name(PyX509Cert *self, PyObject *args) +{ + int oneline = 0; + X509_NAME *name; + if (!PyArg_ParseTuple(args, "|p:get_subject_name", &oneline)) { + return NULL; + } + name = X509_get_subject_name(PyX509Cert_Cert(self)); + if (oneline) { + return _x509name_oneline(name); + } else { + return _create_tuple_for_X509_NAME(name); + } +} + +static PyGetSetDef PyX509Cert_getsetlist[] = { + {"verified", (getter)pyx509cert_get_verified, NULL, NULL}, + {"ca_cert", (getter)pyx509cert_get_ca_cert, NULL, NULL}, + {NULL}, /* sentinel */ +}; + +static PyMethodDef PyX509CertMethods[] = { + {"get_info", (PyCFunction)pyx509cert_get_info, METH_NOARGS}, + {"get_der", (PyCFunction)pyx509cert_get_der, METH_NOARGS}, + {"raw_print", (PyCFunction)pyx509cert_raw_print, METH_NOARGS}, + {"is_issuer_of", (PyCFunction)pyx509cert_is_issuer_of, METH_O}, + {"is_issued_by", (PyCFunction)pyx509cert_is_issued_by, METH_O}, + {"is_selfsigned", (PyCFunction)pyx509cert_is_selfsigned, METH_NOARGS}, + {"get_purposes", (PyCFunction)pyx509cert_get_purposes, + METH_VARARGS | METH_KEYWORDS, NULL}, + {"get_issuer_name", (PyCFunction)pyx509cert_get_issuer_name, METH_VARARGS}, + {"get_subject_name", (PyCFunction)pyx509cert_get_subject_name, METH_VARARGS}, + {NULL, NULL} +}; + +static PyTypeObject PyX509Cert_Type = { + PyVarObject_HEAD_INIT(NULL, 0) + "_ssl.X509Cert", /*tp_name*/ + sizeof(PyX509Cert), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + /* methods */ + (destructor)pyx509cert_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_reserved*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + (hashfunc)pyx509cert_hash, /*tp_hash*/ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + 0, /*tp_doc*/ + 0, /*tp_traverse*/ + 0, /*tp_clear*/ + pyx509cert_richcompare, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + PyX509CertMethods, /*tp_methods*/ + 0, /*tp_members*/ + PyX509Cert_getsetlist, /*tp_getset*/ + 0, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + 0, /*tp_init*/ + 0, /*tp_alloc*/ + 0, /*tp_new*/ +}; + +static PyObject* +newPyX509CertTuple(STACK_OF(X509) *stack, int verified, int x509dup) +{ + int len, i; + PyObject *retval = NULL, *ox509 = NULL; + + len = sk_X509_num(stack); + if ((retval = PyTuple_New(len)) == NULL) { + return NULL; + } + for (i = 0; i < len; i++) { + X509 *cert = sk_X509_value(stack, i); + ox509 = (PyObject*)newPyX509Cert(cert, verified, x509dup); + if (ox509 == NULL) { + Py_CLEAR(retval); + break; + } + PyTuple_SET_ITEM(retval, i, ox509); + } + return retval; +} + +/* get dict of short name: long purpose names */ +static PyObject* +get_purposes(void) +{ + PyObject *dct = NULL, *oname = NULL; + X509_PURPOSE *purp; + const char *name, *sname; + int i, result; + + if ((dct = PyDict_New()) == NULL) { + return NULL; + } + + for (i = 0; i < X509_PURPOSE_get_count(); i++) { + purp = X509_PURPOSE_get0(i); + sname = X509_PURPOSE_get0_sname(purp); + name = X509_PURPOSE_get0_name(purp); + if ((oname = PyUnicode_FromString(name)) == NULL) { + goto error; + } + result = PyDict_SetItemString(dct, sname, oname); + Py_DECREF(oname); + if (result == -1) { + goto error; + } + } + return dct; + + error: + Py_DECREF(dct); + return NULL; +} + +/* * SSL objects */ @@ -1095,8 +1515,11 @@ if (!self->peer_cert) Py_RETURN_NONE; + verification = SSL_CTX_get_verify_mode(SSL_get_SSL_CTX(self->ssl)); + return (PyObject *)newPyX509Cert(self->peer_cert, verification, 1); + /* if (binary_mode) { - /* return cert in DER-encoded format */ + / return cert in DER-encoded format / return _certificate_to_der(self->peer_cert); } else { verification = SSL_CTX_get_verify_mode(SSL_get_SSL_CTX(self->ssl)); @@ -1104,7 +1527,7 @@ return PyDict_New(); else return _decode_certificate(self->peer_cert); - } + }*/ } PyDoc_STRVAR(PySSL_peercert_doc, @@ -3191,6 +3614,8 @@ return NULL; if (PyType_Ready(&PySSLSocket_Type) < 0) return NULL; + if (PyType_Ready(&PyX509Cert_Type) < 0) + return NULL; m = PyModule_Create(&_sslmodule); if (m == NULL) @@ -3254,6 +3679,9 @@ if (PyDict_SetItemString(d, "_SSLSocket", (PyObject *)&PySSLSocket_Type) != 0) return NULL; + if (PyDict_SetItemString(d, "X509Cert", + (PyObject *)&PyX509Cert_Type) != 0) + return NULL; PyModule_AddIntConstant(m, "SSL_ERROR_ZERO_RETURN", PY_SSL_ERROR_ZERO_RETURN); PyModule_AddIntConstant(m, "SSL_ERROR_WANT_READ", @@ -3477,5 +3905,10 @@ if (r == NULL || PyModule_AddObject(m, "_OPENSSL_API_VERSION", r)) return NULL; + r = get_purposes(); + if (r == NULL || PyModule_AddObject(m, "cert_purposes", r)) { + return NULL; + } + return m; }