diff -r c6fd0f3aadc4 Lib/sqlite3/test/regression.py --- a/Lib/sqlite3/test/regression.py Mon Aug 29 15:59:48 2016 +0300 +++ b/Lib/sqlite3/test/regression.py Tue Aug 30 20:44:12 2016 +0800 @@ -147,10 +147,30 @@ def CheckSetIsolationLevel(self): """ - See issue 3312. + See issue 27881. """ + class CustomStr(str): + def upper(self): + return None + def __del__(self): + con.isolation_level = "" + levels = ["", "DEFERRED", "IMMEDIATE", "EXCLUSIVE"] + pairs = [(1, TypeError), (b'', TypeError), ("abc", ValueError), + ("IMMEDIATE\0EXCLUSIVE", ValueError)] + con = sqlite.connect(":memory:") - setattr(con, "isolation_level", "\xe9") + con.isolation_level = None + for level in levels: + con.isolation_level = level + con.isolation_level = level.lower() + con.isolation_level = level.capitalize() + con.isolation_level = CustomStr(level) + # setting isolation_level failure should not alter previous state + con.isolation_level = None + con.isolation_level = "DEFERRED" + for value, exc in pairs: + self.assertRaises(exc, setattr, con, "isolation_level", value) + self.assertEqual(con.isolation_level, "DEFERRED") def CheckCursorConstructorCallCheck(self): """ diff -r c6fd0f3aadc4 Modules/_sqlite/connection.c --- a/Modules/_sqlite/connection.c Mon Aug 29 15:59:48 2016 +0300 +++ b/Modules/_sqlite/connection.c Tue Aug 30 20:44:12 2016 +0800 @@ -43,6 +43,8 @@ _Py_IDENTIFIER(cursor); +static const char * const isolation_levels[] = {"", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", NULL}; + static int pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* isolation_level); static void _pysqlite_drop_unused_cursor_references(pysqlite_Connection* self); @@ -1175,59 +1177,71 @@ static int pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* isolation_level) { - PyObject* res; - PyObject* begin_statement; - static PyObject* begin_word; - - Py_XDECREF(self->isolation_level); - - if (self->begin_statement) { - PyMem_Free(self->begin_statement); - self->begin_statement = NULL; - } + static const char begin_word[] = "BEGIN "; if (isolation_level == Py_None) { - Py_INCREF(Py_None); - self->isolation_level = Py_None; - - res = pysqlite_connection_commit(self, NULL); + PyObject *res = pysqlite_connection_commit(self, NULL); if (!res) { return -1; } Py_DECREF(res); + if (self->begin_statement) { + PyMem_Free(self->begin_statement); + self->begin_statement = NULL; + } self->inTransaction = 0; } else { - const char *statement; - Py_ssize_t size; + size_t size, bsize, csize; + char *begin_statement; + const char * const *candidate; + PyObject *uppercase_level; + _Py_IDENTIFIER(upper); - Py_INCREF(isolation_level); - self->isolation_level = isolation_level; - - if (!begin_word) { - begin_word = PyUnicode_FromString("BEGIN "); - if (!begin_word) return -1; - } - begin_statement = PyUnicode_Concat(begin_word, isolation_level); - if (!begin_statement) { + if (!PyUnicode_Check(isolation_level)) { + PyErr_Format(PyExc_TypeError, + "isolation_level must be a string or None, not %.100s", + Py_TYPE(isolation_level)->tp_name); return -1; } - statement = _PyUnicode_AsStringAndSize(begin_statement, &size); - if (!statement) { - Py_DECREF(begin_statement); + uppercase_level = \ + _PyObject_CallMethodIdObjArgs((PyObject *)&PyUnicode_Type, + &PyId_upper, isolation_level, NULL); + if (!uppercase_level) { return -1; } - self->begin_statement = PyMem_Malloc(size + 2); - if (!self->begin_statement) { - Py_DECREF(begin_statement); + for (candidate = isolation_levels; *candidate; candidate++) { + if (!PyUnicode_CompareWithASCIIString(uppercase_level, *candidate)) + break; + } + Py_DECREF(uppercase_level); + if (!*candidate) { + PyErr_SetString(PyExc_ValueError, + "invalid value for isolation_level"); return -1; } - strcpy(self->begin_statement, statement); - Py_DECREF(begin_statement); + bsize = strlen(begin_word); + csize = strlen(*candidate); + size = bsize + csize; + begin_statement = PyMem_Malloc(size+1); + if (!begin_statement) { + PyErr_NoMemory(); + return -1; + } + + memcpy(begin_statement, begin_word, bsize); + memcpy(begin_statement+bsize, *candidate, csize+1); + + if (self->begin_statement) { + PyMem_Free(self->begin_statement); + } + self->begin_statement = begin_statement; } + Py_INCREF(isolation_level); + Py_XSETREF(self->isolation_level, isolation_level); return 0; }