import atexit import os import sqlite3 class FakeError: pass def table_exists(conn, table): sql = 'PRAGMA TABLE_INFO(%s) ' % table cur = conn.execute(sql) row = cur.fetchone() return row is not None def row_count(conn, table): cur = conn.execute('SELECT COUNT(*) FROM %s' % table) row = cur.fetchone() return row[0] def index_exists(conn, table): sql = 'PRAGMA INDEX_INFO(%s) ' % table cur = conn.execute(sql) row = cur.fetchone() return row is not None def column_exists(conn, table, column): sql = 'PRAGMA TABLE_INFO(%s) ' % table cur = conn.execute(sql) for row in cur: if row[1] == column: return True return False def create_temp_database(): filename = os.tempnam('/tmp', 'test') + '.db' #print "using filename", filename conn = sqlite3.connect(filename) atexit.register(os.remove, filename) conn.execute('CREATE TABLE foo (id INTEGER)') conn.commit() assert table_exists(conn, 'foo') return conn def test_dml(): ''' check basic dml is transactional ''' conn = create_temp_database() conn.execute('INSERT INTO foo VALUES (1)') conn.rollback() assert row_count(conn, 'foo') == 0 def test_table_create(): ''' check that table create is transactional ''' conn = create_temp_database() conn.execute('INSERT INTO foo VALUES (1)') conn.execute('CREATE TABLE bar (id INTEGER)') assert table_exists(conn, 'bar') conn.execute('INSERT INTO bar VALUES (1)') conn.rollback() assert not table_exists(conn, 'bar') def test_index_create(): ''' check that index create is transactional ''' conn = create_temp_database() conn.execute('INSERT INTO foo VALUES (1)') conn.execute('CREATE INDEX foo_by_id ON foo (id)') assert index_exists(conn, 'foo_by_id') conn.execute('INSERT INTO foo VALUES (2)') conn.rollback() assert not index_exists(conn, 'foo_by_id') def test_column_add(): ''' check that column add is transactional ''' conn = create_temp_database() conn.execute('INSERT INTO foo VALUES (1)') conn.execute('ALTER TABLE foo ADD COLUMN stuff INTEGER') assert column_exists(conn, 'foo', 'stuff') conn.execute('INSERT INTO foo VALUES (2, 3)') conn.rollback() assert not column_exists(conn, 'foo', 'stuff') def test_rename(): ''' check that table rename is transactional ''' conn = create_temp_database() conn.execute('INSERT INTO foo VALUES (1)') conn.execute('ALTER TABLE foo RENAME TO bar') assert not table_exists(conn, 'foo') assert table_exists(conn, 'bar') conn.rollback() assert table_exists(conn, 'foo') assert not table_exists(conn, 'bar') def test_drop_table(): ''' check that table drop is transactional ''' conn = create_temp_database() conn.execute('INSERT INTO foo VALUES (1)') conn.execute('DROP TABLE foo') assert not table_exists(conn, 'foo') conn.rollback() assert table_exists(conn, 'foo') def test_drop_index(): ''' check that index drop is transactional ''' conn = create_temp_database() conn.execute('CREATE INDEX foo_by_id ON foo (id)') conn.execute('INSERT INTO foo VALUES (1)') conn.commit() assert index_exists(conn, 'foo_by_id') conn.execute('DROP INDEX foo_by_id') assert not index_exists(conn, 'foo_by_id') conn.rollback() assert index_exists(conn, 'foo_by_id')