Source code for advanced_alchemy.routing.selectors

"""Replica selectors for read/write routing.

This module provides different strategies for selecting which read replica
to use for read operations.
"""

import secrets
import threading
from abc import ABC, abstractmethod
from itertools import cycle
from typing import TYPE_CHECKING, Generic, TypeVar, Union

if TYPE_CHECKING:
    from collections.abc import Iterator

    from sqlalchemy import Engine
    from sqlalchemy.ext.asyncio import AsyncEngine


__all__ = (
    "RandomSelector",
    "ReplicaSelector",
    "RoundRobinSelector",
)


EngineT = TypeVar("EngineT", bound="Union[Engine, AsyncEngine]")


class EngineSelector(ABC, Generic[EngineT]):
    """Abstract base class for engine selection strategies.

    Subclasses implement different algorithms for choosing which
    engine to use for operations.

    Attributes:
        _engines: List of engines to select from.
    """

    __slots__ = ("_engines",)

    def __init__(self, engines: list[EngineT]) -> None:
        """Initialize the selector with a list of engines.

        Args:
            engines: List of database engines.
        """
        self._engines = engines

    def has_engines(self) -> bool:
        """Check if any engines are configured.

        Returns:
            ``True`` if at least one engine is available.
        """
        return len(self._engines) > 0

    def has_replicas(self) -> bool:
        """Check if any replicas are configured (alias for has_engines).

        Returns:
            ``True`` if at least one engine is available.
        """
        return self.has_engines()

    @property
    def engines(self) -> list[EngineT]:
        """Get the list of engines.

        Returns:
            List of configured engines.
        """
        return self._engines

    @property
    def replicas(self) -> list[EngineT]:
        """Get the list of replica engines (alias for engines).

        Returns:
            List of configured engines.
        """
        return self.engines

    @abstractmethod
    def next(self) -> EngineT:
        """Select the next engine to use.

        Returns:
            The selected engine.

        Raises:
            RuntimeError: If no engines are available.
        """
        ...


# Alias for backward compatibility
ReplicaSelector = EngineSelector


[docs] class RoundRobinSelector(EngineSelector[EngineT]): """Round-robin engine selection. Cycles through engines in order, distributing load evenly across all available engines. This selector is thread-safe. Example: Creating a round-robin selector:: selector = RoundRobinSelector(engines) engine1 = selector.next() engine2 = selector.next() engine3 = selector.next() This cycles through engines in order and wraps back to the first. """ __slots__ = ("_cycle", "_lock")
[docs] def __init__(self, engines: list[EngineT]) -> None: """Initialize the round-robin selector. Args: engines: List of database engines. """ super().__init__(engines) self._cycle: Iterator[EngineT] = cycle(engines) if engines else iter([]) self._lock = threading.Lock()
[docs] def next(self) -> EngineT: """Select the next engine in round-robin order. Returns: The next engine in the cycle. Raises: RuntimeError: If no engines are configured. """ if not self._engines: msg = "No engines configured for round-robin selection" raise RuntimeError(msg) with self._lock: return next(self._cycle)
[docs] class RandomSelector(EngineSelector[EngineT]): """Random engine selection. Selects engines randomly, which can help with load distribution when engines have varying capacity or when you want to avoid predictable patterns. Example: Creating a random selector:: selector = RandomSelector(engines) engine = selector.next() """ __slots__ = ()
[docs] def next(self) -> EngineT: """Select a random engine. Returns: A randomly selected engine. Raises: RuntimeError: If no engines are configured. """ if not self._engines: msg = "No engines configured for random selection" raise RuntimeError(msg) return secrets.choice(self._engines)