SqlAlchemyConnection provides a connection to DataBase using SQLAlchemy. it provides a sqlalchemy.orm.Session on every call to the get method.
Upon exit of the "with" statement, the session commit, ` flush, and close` will be called
Example
your_core_lib.yaml
# @package _global_
core_lib:
...
data:
sqlalchemy:
log_queries: false
create_db: true
session:
pool_recycle: 3600
pool_pre_ping: false
url:
protocol: postgresql
username: ${oc.env:POSTGRES_USER}
password: ${oc.env:POSTGRES_PASSWORD}
host: ${oc.env:POSTGRES_HOST}
port: ${oc.env:POSTGRES_PORT}
file: ${oc.env:POSTGRES_DB}
...
your_core_lib.py
from core_lib.core_lib import CoreLib
from core_lib.connection.sql_alchemy_connection_factory import SqlAlchemyConnectionFactory
from core_lib.helpers.config_instances import instantiate_config
class YourCoreLib(CoreLib):
def __init__(self, config: DictConfig):
CoreLib.__init__(self)
db_connection = SqlAlchemyConnectionFactory(self.config.core_lib.data.sqlalchemy)
# OR
db_connection = instantiate_config(self.config.core_lib.data.sqlalchemy, SqlAlchemyConnectionFactory)
self.user = UserDataAccess(db_connection)
...
user_data_access.py
from core_lib.data_layers.data_access.data_access import DataAccess
from core_lib.connection.sql_alchemy_connection_factory import SqlAlchemyConnectionFactory
from core_lib.error_handling.not_found_decorator import NotFoundErrorHandler
class UserDataAccess(DataAccess):
def __init__(self, db: SqlAlchemyConnectionFactory):
self._db = db
@NotFoundErrorHandler()
def get(self, user_id: int) -> User
with self._db.get() as session:
return session.query(User).get(user_id)
def update(self, user_id: int, data: dict):
with self._db.get() as session:
session.query(User).filter(User.id == user_id).update(data)
SqlAlchemyConnectionFactory
core_lib.connection.sql_alchemy_connection_factory.SqlAlchemyConnectionFactory [[source]
SqlAlchemyConnectionFactory provides a fresh SqlAlchemyConnection connection using the get() method.
Functions
__init__()
core_lib.connection.sql_alchemy_connection_factory.SqlAlchemyConnectionFactory.__init__() [source]
Create sqlalchemy.engine.Engine and sqlalchemy.engine.Connection according to the config parameter.
def __init__(self, config: DictConfig):
Arguments
config(DictConfig): DataBase connection config from theCore-Lib'sconfig file.
get()
core_lib.connection.sql_alchemy_connection_factory.SqlAlchemyConnectionFactory.get() [source]
Returns a fresh SqlAlchemyConnection instance.
def get(self, *args, **kwargs) -> SqlAlchemyConnection:
Returns
SqlAlchemyConnection
engine()
core_lib.connection.sql_alchemy_connection_factory.SqlAlchemyConnectionFactory.engine() [source]
Returns the sqlalchemy.engine.Engine.
def engine(self) -> engine:
Returns
sqlalchemy.engine.Engine
connection()
core_lib.connection.sql_alchemy_connection_factory.SqlAlchemyConnectionFactory.connection() [source]
Returns the sqlalchemy.engine.Connection connection created in the init method.
def connection(self):
Returns
sqlalchemy.engine.Connection
SqlAlchemyConnection
core_lib.connection.sql_alchemy_connection.SqlAlchemyConnection [source]
Provide a fresh ` sqlalchemy.orm.Session` and releases its resources upon complete.
class SqlAlchemyConnection(Connection):
def __init__(self, engine, on_exit):
Arguments
engine:sqlalchemy.engine.Engine
close()
core_lib.connection.sql_alchemy_connection.SqlAlchemyConnection.close() [source]
Responsible for committing, flushing and closing the DB session automatically after the queries are completed.
def close(self):