import sqlite3 import threading import random DBPATH = "file:db?mode=memory&cache=shared" # keep one connection open to keep database alive conn = sqlite3.connect(DBPATH, uri=True) conn.executescript("""\ create table if not exists test (key int primary key, value int not null); """) for i in range(10000): conn.execute("insert into test (key, value) values (?, ?)", (i, random.randint(0, 100000000))) conn.commit() # we have to use an explicit lock because in shared cached mode # sqlite does not block for us and errors out instantly with "database is locked" # error (https://sqlite.org/sharedcache.html#table_level_locking) lock = threading.Lock() class ReproThread(threading.Thread): def run(self): counter = 0 while True: if not (counter % 10): print("%r is still alive" % (self.name,)) conn = sqlite3.connect(DBPATH, uri=True) reg_func(conn) # explicitly start transaction conn.isolation_level = None if random.randint(0, 1): with lock.shared_context(): conn.execute("begin deferred") try: cursor = conn.cursor() try: for _ in range(1000): cursor.execute("select value from test where key = ?", (random.randint(0, 10000 - 1),)) cursor.fetchone() finally: cursor.close() finally: conn.commit() else: with lock: conn.execute("begin immediate") try: conn.executemany("update test set value = ? where key = ?", [(random.randint(0, 10000000), i) for i in range(10000)]) finally: conn.commit() conn.close() counter += 1 threads = [] for i in range(10): threads.append(ReproThread()) threads[-1].start()