Source code for advanced_alchemy.config.sync

"""Sync SQLAlchemy configuration module."""

from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional, cast

from sqlalchemy import Connection, Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker

from advanced_alchemy._listeners import set_async_context
from advanced_alchemy.config.common import GenericAlembicConfig, GenericSessionConfig, GenericSQLAlchemyConfig
from advanced_alchemy.exceptions import ImproperConfigurationError

if TYPE_CHECKING:
    from collections.abc import Generator
    from typing import Callable

    from advanced_alchemy.config.routing import RoutingConfig


__all__ = (
    "AlembicSyncConfig",
    "SQLAlchemySyncConfig",
    "SyncSessionConfig",
)


[docs] @dataclass class SyncSessionConfig(GenericSessionConfig[Connection, Engine, Session]): """Configuration for synchronous SQLAlchemy sessions."""
[docs] @dataclass class AlembicSyncConfig(GenericAlembicConfig): """Configuration for Alembic's synchronous migrations. For details see: https://alembic.sqlalchemy.org/en/latest/api/config.html """
[docs] @dataclass class SQLAlchemySyncConfig(GenericSQLAlchemyConfig[Engine, Session, sessionmaker[Session]]): """Synchronous SQLAlchemy Configuration. Note: The alembic configuration options are documented in the Alembic documentation. Example: Basic sync configuration:: config = SQLAlchemySyncConfig( connection_string="postgresql://user:pass@localhost/db", ) Configuration with read/write routing:: from advanced_alchemy.config.routing import RoutingConfig config = SQLAlchemySyncConfig( routing_config=RoutingConfig( primary_connection_string="postgresql://user:pass@primary/db", read_replicas=["postgresql://user:pass@replica/db"], ), ) """ create_engine_callable: "Callable[[str], Engine]" = create_engine """Callable that creates an :class:`Engine <sqlalchemy.Engine>` instance or instance of its subclass.""" session_config: SyncSessionConfig = field(default_factory=SyncSessionConfig) # pyright: ignore[reportIncompatibleVariableOverride] """Configuration options for the :class:`sessionmaker<sqlalchemy.orm.sessionmaker>`.""" session_maker_class: type[sessionmaker[Session]] = sessionmaker # pyright: ignore[reportIncompatibleVariableOverride] """Sessionmaker class to use.""" alembic_config: AlembicSyncConfig = field(default_factory=AlembicSyncConfig) """Configuration for the SQLAlchemy Alembic migrations. The configuration options are documented in the Alembic documentation. """ routing_config: "Optional[RoutingConfig]" = None """Optional read/write routing configuration. When provided, enables automatic routing of read operations to replicas and write operations to the primary database. .. note:: When using ``routing_config``, do not set ``connection_string``. The primary connection is specified in the routing config. """ def __post_init__(self) -> None: # Validate routing config vs connection_string if self.routing_config is not None and self.connection_string is not None: msg = "Provide either 'connection_string' or 'routing_config', not both" raise ImproperConfigurationError(msg) # If routing_config is set, use its primary as the connection_string for compatibility if self.routing_config is not None: self.connection_string = self.routing_config.primary_connection_string if self.connection_string is None: # Try to get from default group engines configs = self.routing_config.get_engine_configs(self.routing_config.default_group) if configs: self.connection_string = configs[0].connection_string super().__post_init__() def __hash__(self) -> int: return super().__hash__() def __eq__(self, other: object) -> bool: return super().__eq__(other)
[docs] def create_session_maker(self) -> "Callable[[], Session]": """Get a session maker. If routing is configured, returns a routing-aware session maker. Otherwise, returns a standard session maker. Returns: A callable that creates session instances. """ if self.session_maker: return self.session_maker from sqlalchemy import event from advanced_alchemy._listeners import ( SyncCacheListener, SyncFileObjectListener, touch_updated_timestamp, ) # Use routing session maker if routing is configured if self.routing_config is not None: from advanced_alchemy.routing import RoutingSyncSessionMaker routing_maker: Callable[[], Session] = RoutingSyncSessionMaker( routing_config=self.routing_config, engine_config=self.engine_config_dict, session_config=self.session_config_dict, ) self.session_maker = routing_maker else: self.session_maker = super().create_session_maker() if isinstance(self.session_maker, sessionmaker): session_maker = cast( "sessionmaker[Session]", self.session_maker, # pyright: ignore[reportUnknownMemberType] ) if self.enable_file_object_listener: event.listen(session_maker, "before_flush", SyncFileObjectListener.before_flush) event.listen(session_maker, "after_commit", SyncFileObjectListener.after_commit) event.listen(session_maker, "after_rollback", SyncFileObjectListener.after_rollback) if self.enable_touch_updated_timestamp_listener: event.listen(session_maker, "before_flush", touch_updated_timestamp) event.listen(session_maker, "after_commit", SyncCacheListener.after_commit) event.listen(session_maker, "after_rollback", SyncCacheListener.after_rollback) if self.session_maker is None: # pyright: ignore msg = "Session maker was not initialized." # type: ignore[unreachable] raise ImproperConfigurationError(msg) return cast("sessionmaker[Session]", self.session_maker) # pyright: ignore[reportUnknownMemberType]
[docs] @contextmanager def get_session(self) -> "Generator[Session, None, None]": """Get a session context manager. Yields: Generator[sqlalchemy.orm.Session, None, None]: A context manager yielding an active SQLAlchemy Session. Examples: Using the session context manager: >>> with config.get_session() as session: ... session.execute(...) """ session_maker = self.create_session_maker() set_async_context(False) with session_maker() as session: yield session