SqlAlchemyConnection
provides a connection to DataBase
using SQLAlchemy. it provides a sqlalchemy.orm.Session
on every call to the get
method.
Upon exit of the "with"
statement, the session commit
, ` flush, and
close` will be called
Example
your_core_lib.yaml
# @package _global_
core_lib:
...
data:
sqlalchemy:
log_queries: false
create_db: true
session:
pool_recycle: 3600
pool_pre_ping: false
url:
protocol: postgresql
username: ${oc.env:POSTGRES_USER}
password: ${oc.env:POSTGRES_PASSWORD}
host: ${oc.env:POSTGRES_HOST}
port: ${oc.env:POSTGRES_PORT}
file: ${oc.env:POSTGRES_DB}
...
your_core_lib.py
from core_lib.core_lib import CoreLib
from core_lib.connection.sql_alchemy_connection_registry import SqlAlchemyConnectionRegistry
from core_lib.helpers.config_instances import instantiate_config
class YourCoreLib(CoreLib):
def __init__(self, config: DictConfig):
CoreLib.__init__(self)
db_connection = SqlAlchemyConnectionRegistry(self.config.core_lib.data.sqlalchemy)
# OR
db_connection = instantiate_config(self.config.core_lib.data.sqlalchemy, SqlAlchemyConnectionRegistry)
self.user = UserDataAccess(db_connection)
...
user_data_access.py
from core_lib.data_layers.data_access.data_access import DataAccess
from core_lib.connection.sql_alchemy_connection_registry import SqlAlchemyConnectionRegistry
from core_lib.error_handling.not_found_decorator import NotFoundErrorHandler
class UserDataAccess(DataAccess):
def __init__(self, db: SqlAlchemyConnectionRegistry):
self._db = db
@NotFoundErrorHandler()
def get(self, user_id: int) -> User
with self._db.get() as session:
return session.query(User).get(user_id)
def update(self, user_id: int, data: dict):
with self._db.get() as session:
session.query(User).filter(User.id == user_id).update(data)
SqlAlchemyConnectionRegistry
core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry [[source]
SqlAlchemyConnectionRegistry
provides a fresh SqlAlchemyConnection
connection using the get()
method.
Functions
__init__()
core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry.__init__() [source]
Create sqlalchemy.engine.Engine
and sqlalchemy.engine.Connection
according to the config
parameter.
def __init__(self, config: DictConfig):
Arguments
config
(DictConfig)
: DataBase connection config from theCore-Lib's
config file.
get()
core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry.get() [source]
Returns a fresh SqlAlchemyConnection
instance.
def get(self, *args, **kwargs) -> SqlAlchemyConnection:
Returns
SqlAlchemyConnection
engine()
core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry.engine() [source]
Returns the sqlalchemy.engine.Engine
.
def engine(self) -> engine:
Returns
sqlalchemy.engine.Engine
connection()
core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry.connection() [source]
Returns the sqlalchemy.engine.Connection
connection created in the init
method.
def connection(self):
Returns
sqlalchemy.engine.Connection
SqlAlchemyConnection
core_lib.connection.sql_alchemy_connection.SqlAlchemyConnection [source]
Provide a fresh ` sqlalchemy.orm.Session` and releases its resources upon complete.
class SqlAlchemyConnection(Connection):
def __init__(self, engine, on_exit):
Arguments
engine
:sqlalchemy.engine.Engine
close()
core_lib.connection.sql_alchemy_connection.SqlAlchemyConnection.close() [source]
Responsible for committing
, flushing
and closing
the DB session automatically after the queries are completed.
def close(self):