#!/bin/env python3 # $ virtualenv3 venv # $ source venv/bin/activate # $ pip install sqlalchemy # Run once to create DB and insert some stuff into it (no crash): # $ ./minimal_crash.py -d sqlite:///crash.db -v -c # Then re-run same thing WITHOUT re-creating the base: # $ ./minimal_crash.py -d sqlite:///crash.db -v import sys import argparse import logging from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime logger = logging.getLogger(__name__) Base = declarative_base() class Item(Base): __tablename__ = 'table_name' id = Column(String(length=14), primary_key=True) att_1 = Column(String(length=13)) att_2 = Column(Integer) att_3 = Column(Integer) # Dates date_1 = Column(DateTime) date_2 = Column(DateTime) class Processor(object): def __init__(self, session): self.session = session def process(self): message = [{ 'id': '12345678912345', 'att_1': '123456789123456', 'att_2': 2, 'att_3': 4, 'date_1': '2016-01-11T00:30:00', 'date_2': '2016-01-11T00:30:00' }] for document in message: q = document.copy() q.pop('date_1') q.pop('date_2') item = self.session.query(Item).filter_by(**q).one_or_none() if item is None: logger.debug('No item found, new item') item = Item(id=document['id']) self.session.add(item) logger.debug("Documents processed") class MainObject(object): def __init__(self, db): self._db = db self.__Session = None def connect_db(self, create): """ Connect to the DB. """ logger.info("Connecting to DB '%s'", self._db) engine = create_engine(self._db) if create: logging.info("Create tables") Base.metadata.create_all(engine) self.__Session = sessionmaker(bind=engine) def process_file(self): session = self.__Session() try: Processor(session).process() session.commit() except Exception as exc: logger.exception("Error while processing message") session.rollback() raise exc finally: session.close() logger.info("File processed") def main(): parser = argparse.ArgumentParser(description='CLI') parser.add_argument('-v', '--verbose', action='store_true', help='be verbose') parser.add_argument('-d', '--db', help='Database URI', required=True) parser.add_argument('-c', '--create', action='store_true', help='Create tables') args = parser.parse_args() log_level = logging.DEBUG if args.verbose else logging.INFO logging.basicConfig(stream=sys.stderr, level=log_level) if args.verbose: # Enable SQLAlchemy output logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) logging.debug('Called with: %s', sys.argv) mo = MainObject(db=args.db) mo.connect_db(args.create) mo.process_file() if __name__ == '__main__': main()