SqlAlchemyConnection provides a connection to DataBase using SQLAlchemy. it provides a sqlalchemy.orm.Session on every call to the get method. Upon exit of the "with" statement, the session commit, ` flush, and close` will be called

Example

your_core_lib.yaml

# @package _global_
core_lib:
  ...
	data:
    sqlalchemy:
      log_queries: false
      create_db: true      
      session:
        pool_recycle: 3600
        pool_pre_ping: false      
      url:
        protocol: postgresql
        username: ${oc.env:POSTGRES_USER}
        password: ${oc.env:POSTGRES_PASSWORD}
        host: ${oc.env:POSTGRES_HOST}
        port: ${oc.env:POSTGRES_PORT}
        file: ${oc.env:POSTGRES_DB}
  ...

your_core_lib.py

from core_lib.core_lib import CoreLib
from core_lib.connection.sql_alchemy_connection_registry import SqlAlchemyConnectionRegistry
from core_lib.helpers.config_instances import instantiate_config

class YourCoreLib(CoreLib):
    def __init__(self, config: DictConfig):
        CoreLib.__init__(self)
        db_connection = SqlAlchemyConnectionRegistry(self.config.core_lib.data.sqlalchemy)
        # OR
        db_connection = instantiate_config(self.config.core_lib.data.sqlalchemy, SqlAlchemyConnectionRegistry)
        self.user = UserDataAccess(db_connection)
        ...

user_data_access.py

from core_lib.data_layers.data_access.data_access import DataAccess
from core_lib.connection.sql_alchemy_connection_registry import SqlAlchemyConnectionRegistry
from core_lib.error_handling.not_found_decorator import NotFoundErrorHandler

class UserDataAccess(DataAccess):

    def __init__(self, db: SqlAlchemyConnectionRegistry):
        self._db = db

    @NotFoundErrorHandler()
    def get(self, user_id: int) -> User
    		with self._db.get() as session:
           return session.query(User).get(user_id)

    def update(self, user_id: int, data: dict):
        with self._db.get() as session:
            session.query(User).filter(User.id == user_id).update(data)

SqlAlchemyConnectionRegistry

core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry [[source]

SqlAlchemyConnectionRegistry provides a fresh SqlAlchemyConnection connection using the get() method.

Functions

__init__()

core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry.__init__() [source]

Create sqlalchemy.engine.Engine and sqlalchemy.engine.Connection according to the config parameter.

def __init__(self, config: DictConfig):

Arguments

  • config (DictConfig): DataBase connection config from the Core-Lib's config file.

get()

core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry.get() [source]

Returns a fresh SqlAlchemyConnection instance.

def get(self, *args, **kwargs) -> SqlAlchemyConnection:

Returns

SqlAlchemyConnection

engine()

core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry.engine() [source]

Returns the sqlalchemy.engine.Engine.

def engine(self) -> engine:

Returns

sqlalchemy.engine.Engine

connection()

core_lib.connection.sql_alchemy_connection_registry.SqlAlchemyConnectionRegistry.connection() [source]

Returns the sqlalchemy.engine.Connection connection created in the init method.

def connection(self):

Returns

sqlalchemy.engine.Connection

SqlAlchemyConnection

core_lib.connection.sql_alchemy_connection.SqlAlchemyConnection [source]

Provide a fresh ` sqlalchemy.orm.Session` and releases its resources upon complete.

class SqlAlchemyConnection(Connection):
    def __init__(self, engine, on_exit):

Arguments

  • engine: sqlalchemy.engine.Engine

close()

core_lib.connection.sql_alchemy_connection.SqlAlchemyConnection.close() [source]

Responsible for committing, flushing and closing the DB session automatically after the queries are completed.

def close(self):