Skip to content

Instantly share code, notes, and snippets.

@bsnacks000
Created January 3, 2019 17:36
Show Gist options
  • Select an option

  • Save bsnacks000/630768594c831f77084ce0521c2a7fe8 to your computer and use it in GitHub Desktop.

Select an option

Save bsnacks000/630768594c831f77084ce0521c2a7fe8 to your computer and use it in GitHub Desktop.
Sqlalchemy DAL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from contextlib import contextmanager
import logging
l = logging.getLogger(__name__)
class DataAccessLayer(object):
'''
Base class for Database API objects - manages transactions, sessions and holds a reference to the engine.
Acts as a simple session context manager and creates a uniform API for querying using the ORM
This class could be thought of as a singleton factory.
Applications should only ever use one instance per database.
Example:
dal = DataAccessLayer()
dal.connect(engine_url=..., engine_echo=False)
'''
def __init__(self, BaseModel):
self.engine = None
self.Session = None
self.Base = BaseModel
def connect(self, engine_url, engine_echo=True, with_create=True):
''' Builds engine and Session class for app layer
if_drop==True then drop existing tables and rebuild schema
'''
self.engine = create_engine(engine_url, echo=engine_echo)
self.Session = scoped_session(sessionmaker(bind=self.engine)) #NOTE extra config can be implemented in this call to sessionmaker factory
@contextmanager
def transaction(self):
"""this method safely wraps a session object in a transactional scope
used for basic create, select, update and delete procedures
"""
session = self.Session()
try:
yield session
session.commit()
except Exception as e:
l.error(e)
session.rollback()
l.warn('session rolled back')
raise
finally:
session.close()
l.info('session closed')
def bulk_insert(self, records):
""" performs a bulk insert on a list of records
Arguments:
records {list} -- obj records in list of dicts format
"""
with self.transaction() as session:
session.bulk_save_objects(records)
def safe_append(self, records, keep_errors=True):
'''Performs a 'safe append' of an object where integrity errors are
caught and the db rolled back.
Arguments:
records {list} -- obj records in list of dict format
Keyword Arguments:
keep_errors {bool} -- will keep any error records for the user (default: {True})
Returns:
error_records, error_messages -- a tuple of lists with errors from the append
'''
error_messages = []
error_records = []
for rec in records:
try:
with self.transaction() as session:
l.info('inserting record: {}'.format(r))
session.add(rec)
except IntegrityError as err:
l.error('An integrity error was reported for facility')
error_messages.append(err.message) # append message and errors
error_records += [rec]
continue
return error_records, error_messages
def get_session(self):
'''returns a session to the caller
'''
return self.Session()
def reset_db(self):
""" destroys the database
"""
self.Base.metadata.drop_all(self.engine)
def create_all(self):
""" creates the database
"""
self.Base.metadata.create_all(self.engine)
def drop_table(self, Model):
""" Attempts to drop the specified table from db
"""
Model.__table__.drop(self.engine)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment