import os import sqlite3 MTN = 'DICTIONARY_TABLE' #MTN -> Magic Table Name sentinel = object() def open(filename, flag='w', mtn=MTN): return SqLiteDictionary(filename, flag, mtn) def _explain(db, q, args): _row = ['addr', 'opcode', 'p1', 'p2', 'p3', 'p4', 'p5', 'comment'] rows = list(map(str, i) for i in db.execute("EXPLAIN "+q, args)) rows.insert(0, row) for i, row in enumerate(rows): while len(row) != len(_row): row.append('') cw = [max(map(len, i)) for i in zip(*rows)] rows.insert(1, [i*'-' for i in cw]) fstring = ' '.join('%%%is'%i for i in cw) return '\n'.join(fstring%tuple(i) for i in rows) ''' flag meaning ---- ------- r read-only w read-write ''' class SqLiteDictionary(object): __slots__ = '_flag', '_db', '_cursor', '_mtn' def __init__(self, filename, flag, mtn): # check flag if flag not in ('r', 'w'): raise ValueError( "flag argument must be 'r' or 'w'; not %r"%(flag,)) self._flag = flag self._mtn = mtn # check whether the file exists if filename != ':memory:': if self._flag == 'r': os.stat(filename) elif self._flag == 'r': # in-memory database that is to be read-only? raise IOError("File not found") # open the db and check for the table self._db = sqlite3.connect(filename) for name, in self._db.execute("SELECT name FROM sqlite_master"): if name == self._mtn: continue break else: if self._flag == 'w': self._create_table() else: raise ValueError( "Dictionary table %s does not exist within sqlite database"% self._mtn) self._cursor = None #------------------------------- internal bits ------------------------------- def _create_table(self): self._db.execute(''' CREATE TABLE %s ( ROWKEY BLOB PRIMARY KEY, ROWVAL BLOB);'''%self._mtn) @staticmethod def _check_key(key): if type(key) is not str: raise ValueError( "Can only use str instances as keys, not %r"%(type(key),)) return buffer(key) @staticmethod def _check_value(value): if type(value) is not str: raise ValueError( "Can only use str instances as values, not %r"%(type(value),)) return buffer(value) @staticmethod def _ke(key): return KeyError("Key %r not found"%(key,)) @classmethod def _ro(cls): return TypeError( "Read-only instance of %s does not support item assignment"% (cls.__name__,)) #----------------------- standard sequence operations ------------------------ def __len__(self): for length, in self._db.execute("SELECT count(1) FROM %s"%self._mtn): return length # should never get here def __iter__(self): QUERY = "SELECT ROWKEY FROM %s ORDER BY ROWKEY"%self._mtn for rowkey, in self._db.execute(QUERY): yield str(rowkey) def __contains__(self, key): key = self._check_key(key) try: self[key] except KeyError: return 0 return 1 def __getitem__(self, key): key = self._check_key(key) QUERY = "SELECT ROWVAL FROM %s WHERE ROWKEY = ?"%self._mtn for rowval, in self._db.execute(QUERY, (key,)): return str(rowval) raise self._ke(key) def __setitem__(self, key, value): if self._flag == 'r': raise self._ro() key = self._check_key(key) value = self._check_value(value) QUERY = "REPLACE INTO %s (ROWKEY, ROWVAL) VALUES (?, ?)"%self._mtn self._db.execute(QUERY, (key, value)) def __delitem__(self, key): if self._flag == 'r': raise self._ro() key = self._check_key(key) QUERY = "DELETE FROM %s WHERE ROWKEY = ?"%self._mtn if self._db.execute(QUERY, (key,)) < 1: raise self._ke(key) #---------------------------- dictionary iterface ---------------------------- has_key = __contains__ def get(self, key, default=None): try: return self[key] except KeyError: return default def pop(self, key, default=sentinel): try: default = self[key] del self[key] except KeyError: if default is sentinel: raise return default # keys iterkeys = __iter__ def keys(self): return list(self) # values def itervalues(self): for i,j in self.iteritems(): yield j def values(self): return list(self.itervalues()) # items def iteritems(self): QUERY = "SELECT ROWKEY, ROWVAL FROM %s ORDER BY ROWKEY"%self._mtn for i,j in self._db.execute(QUERY): yield str(i), str(j) def items(self): return list(self.iteritems()) # update def update(self, other): if hasattr(other, 'iteritems'): other = other.iteritems() elif hasattr(other, 'items'): other = other.items() for i,j in other: self[i] = j # clear def clear(self): self._db.execute("DELETE FROM %s"%self._mtn) def _slowclear(self): self._db.execute("DELETE FROM %s WHERE 1"%self._mtn) #---------------------------- dbm-like interface ----------------------------- def sync(self): self._db.commit() def close(self): self.sync() self._cursor = None self._db.close() def _sc(self, c, x): if c is None: raise KeyError("%s key not found"%x) c = tuple(str(i) for i in c) self._cursor = c return c def _step(self, key, cmp, dire): key = self._check_key(key) o = '' if '<' in cmp: o = 'DESC' c = None QUERY = ("SELECT ROWKEY, ROWVAL FROM %s WHERE ROWKEY %s ? ORDER BY " "ROWKEY %s LIMIT 1"%(self._mtn, cmp, o)) for c in self._db.execute(QUERY, (key,)): break return self._sc(c, dire) def set_location(self, key): return self._step(key, '>=', 'Usable') def first(self): return self._step('', '>=', 'First') def next(self): if self._cursor is None: return self.first() return self._step(self._cursor[0], '>', 'Next') def last(self): for c, in self._db.execute("SELECT MAX(ROWKEY) FROM %s"%self._mtn): return self._step(c, '<=', 'Last') return self._sc(None, 'Last') def previous(self): if self._cursor is None: return self.last() return self._step(self._cursor[0], '<', 'Previous')