diff -r 8ec5a00e5d75 Lib/sqlite3/test/regression.py --- a/Lib/sqlite3/test/regression.py Sat Aug 27 21:26:35 2016 +0300 +++ b/Lib/sqlite3/test/regression.py Sun Aug 28 18:27:06 2016 +0800 @@ -147,10 +147,29 @@ def CheckSetIsolationLevel(self): """ - See issue 3312. + See issue 27881. """ + class CustomStr(str): + def __del__(self): + con.isolation_level = "" + levels = ["", "DEFERRED", "IMMEDIATE", "EXCLUSIVE"] + con = sqlite.connect(":memory:") - setattr(con, "isolation_level", "\xe9") + setattr(con, "isolation_level", None) + for level in levels: + setattr(con, "isolation_level", level) + setattr(con, "isolation_level", level.lower()) + setattr(con, "isolation_level", level.capitalize()) + setattr(con, "isolation_level", CustomStr(level)) + # setting isolation_level failure should not alter previous state + setattr(con, "isolation_level", None) + setattr(con, "isolation_level", "DEFERRED") + self.assertRaises(TypeError, setattr, con, "isolation_level", 1) + self.assertEqual(con.isolation_level, "DEFERRED") + self.assertRaises(TypeError, setattr, con, "isolation_level", b'') + self.assertEqual(con.isolation_level, "DEFERRED") + self.assertRaises(ValueError, setattr, con, "isolation_level", "abc") + self.assertEqual(con.isolation_level, "DEFERRED") def CheckCursorConstructorCallCheck(self): """ diff -r 8ec5a00e5d75 Modules/_sqlite/connection.c --- a/Modules/_sqlite/connection.c Sat Aug 27 21:26:35 2016 +0300 +++ b/Modules/_sqlite/connection.c Sun Aug 28 18:27:06 2016 +0800 @@ -43,6 +43,8 @@ _Py_IDENTIFIER(cursor); +static const char *isolation_levels[] = {"", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", NULL}; + static int pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* isolation_level); static void _pysqlite_drop_unused_cursor_references(pysqlite_Connection* self); @@ -1166,59 +1168,71 @@ static int pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* isolation_level) { - PyObject* res; - PyObject* begin_statement; - static PyObject* begin_word; - - Py_XDECREF(self->isolation_level); - - if (self->begin_statement) { - PyMem_Free(self->begin_statement); - self->begin_statement = NULL; - } + static const char *begin_word = "BEGIN "; if (isolation_level == Py_None) { - Py_INCREF(Py_None); - self->isolation_level = Py_None; - - res = pysqlite_connection_commit(self, NULL); + PyObject *res = pysqlite_connection_commit(self, NULL); if (!res) { return -1; } Py_DECREF(res); + if (self->begin_statement) { + PyMem_Free(self->begin_statement); + self->begin_statement = NULL; + } self->inTransaction = 0; } else { - const char *statement; - Py_ssize_t size; + size_t size, bsize, csize; + char *begin_statement; + const char **candidate; + PyObject *uppercase_level; + _Py_IDENTIFIER(upper); - Py_INCREF(isolation_level); - self->isolation_level = isolation_level; - - if (!begin_word) { - begin_word = PyUnicode_FromString("BEGIN "); - if (!begin_word) return -1; - } - begin_statement = PyUnicode_Concat(begin_word, isolation_level); - if (!begin_statement) { + if (!PyUnicode_Check(isolation_level)) { + PyErr_Format(PyExc_TypeError, + "isolation_level must be a string or None, not %.100s", + Py_TYPE(isolation_level)->tp_name); return -1; } - statement = _PyUnicode_AsStringAndSize(begin_statement, &size); - if (!statement) { - Py_DECREF(begin_statement); + uppercase_level = _PyObject_CallMethodId(isolation_level, + &PyId_upper, ""); + if (!uppercase_level) { return -1; } - self->begin_statement = PyMem_Malloc(size + 2); - if (!self->begin_statement) { - Py_DECREF(begin_statement); + for (candidate = isolation_levels; *candidate; candidate++) { + if (!PyUnicode_CompareWithASCIIString(uppercase_level, *candidate)) + break; + } + Py_DECREF(uppercase_level); + if (!*candidate) { + PyErr_SetString(PyExc_ValueError, + "invalid value for isolation_level"); return -1; } - strcpy(self->begin_statement, statement); - Py_DECREF(begin_statement); + bsize = strlen(begin_word); + csize = strlen(*candidate); + size = bsize + csize; + begin_statement = PyMem_Malloc(size+1); + if (!begin_statement) { + PyErr_NoMemory(); + return -1; + } + + strncpy(begin_statement, begin_word, bsize); + strncpy(begin_statement+bsize, *candidate, csize); + begin_statement[size] = '\0'; + + if (self->begin_statement) { + PyMem_Free(self->begin_statement); + } + self->begin_statement = begin_statement; } + Py_INCREF(isolation_level); + Py_XSETREF(self->isolation_level, isolation_level); return 0; }