Read/Write Routing¶
Advanced Alchemy provides automatic routing of read operations to read replicas while directing write operations to the primary database. This enables better scalability by distributing read load across multiple replica databases.
Why Use Read/Write Routing?¶
Read/write routing is essential for scaling read-heavy applications:
Scalability: Distribute read load across multiple replica databases
Performance: Reduce primary database load by offloading read queries
High Availability: Continue serving reads even if primary is under maintenance
Cloud-Native: Leverage managed database replicas (AWS Aurora, Google Cloud SQL, etc.)
Quick Start¶
Basic configuration with a single replica:
from advanced_alchemy.config import SQLAlchemyAsyncConfig
from advanced_alchemy.config.routing import RoutingConfig
config = SQLAlchemyAsyncConfig(
routing_config=RoutingConfig(
primary_connection_string="postgresql+asyncpg://user:pass@primary:5432/db",
read_replicas=[
"postgresql+asyncpg://user:pass@replica1:5432/db",
],
),
)
# Create session factory
session_maker = config.create_session_maker()
# Use with repository - reads automatically go to replica
async with session_maker() as session:
repo = UserRepository(session=session)
users = await repo.list() # Routes to replica
Configuration¶
Routing Strategy¶
Choose how replicas are selected for read operations:
from advanced_alchemy.config.routing import RoutingConfig, RoutingStrategy
# Round-robin (default) - distributes load evenly
config = RoutingConfig(
primary_connection_string="postgresql+asyncpg://...",
read_replicas=["postgresql+asyncpg://replica1:5432/db", "..."],
routing_strategy=RoutingStrategy.ROUND_ROBIN,
)
# Random - randomly selects replica
config = RoutingConfig(
primary_connection_string="postgresql+asyncpg://...",
read_replicas=["postgresql+asyncpg://replica1:5432/db", "..."],
routing_strategy=RoutingStrategy.RANDOM,
)
Multiple Replicas¶
Configure multiple replicas with custom weights:
from advanced_alchemy.config.routing import RoutingConfig, ReplicaConfig
config = RoutingConfig(
primary_connection_string="postgresql+asyncpg://user:pass@primary:5432/db",
read_replicas=[
ReplicaConfig(
connection_string="postgresql+asyncpg://user:pass@replica1:5432/db",
weight=2, # Gets 2x traffic
name="replica-1-us-east",
),
ReplicaConfig(
connection_string="postgresql+asyncpg://user:pass@replica2:5432/db",
weight=1,
name="replica-2-us-west",
),
],
routing_strategy=RoutingStrategy.ROUND_ROBIN,
)
Sticky-After-Write¶
By default, routing ensures read-your-writes consistency. After a write operation, all subsequent reads use the primary database until the transaction is committed:
async with session_maker() as session:
repo = UserRepository(session=session)
# Read routes to replica
users = await repo.list()
# Write routes to primary
new_user = await repo.add(User(name="Alice"))
# Read now routes to primary (sticky-after-write)
user = await repo.get(new_user.id)
# Commit resets stickiness
await session.commit()
# Read can use replica again
users = await repo.list()
To disable sticky-after-write:
config = RoutingConfig(
primary_connection_string="postgresql+asyncpg://...",
read_replicas=["..."],
sticky_after_write=False, # Reads may not see recent writes
)
Routing Rules¶
The routing layer follows these rules:
INSERT/UPDATE/DELETE → Primary
SELECT with FOR UPDATE → Primary
SELECT after write (if sticky-after-write enabled) → Primary
SELECT (no writes) → Replica (round-robin/random)
After commit → Reset stickiness, replicas available again
FOR UPDATE Detection¶
Queries with FOR UPDATE are automatically routed to the primary:
from sqlalchemy import select
async with session_maker() as session:
# Routes to primary (FOR UPDATE detected)
stmt = select(User).where(User.id == user_id).with_for_update()
result = await session.execute(stmt)
user = result.scalar_one()
Advanced Routing with Bind Groups¶
While the primary/read-replica pattern is common, you might need more complex routing scenarios, such as:
Dedicated analytics database
Region-specific replicas
Separate reporting databases
Multiple primary databases (sharding)
You can achieve this by defining Bind Groups in your configuration.
Configuration¶
Use the engines dictionary to define named groups of engines:
from advanced_alchemy.config import SQLAlchemyAsyncConfig
from advanced_alchemy.config.routing import RoutingConfig
config = SQLAlchemyAsyncConfig(
routing_config=RoutingConfig(
# Define multiple engine groups
engines={
"default": ["postgresql+asyncpg://primary:5432/db"],
"read": ["postgresql+asyncpg://replica1:5432/db"],
"analytics": ["postgresql+asyncpg://analytics:5432/db"],
"reporting": [
"postgresql+asyncpg://report-1:5432/db",
"postgresql+asyncpg://report-2:5432/db",
],
},
default_group="default",
read_group="read",
),
)
Using Bind Groups¶
You can route operations to specific groups using context managers or explicit parameters.
Context Manager
Use use_bind_group to route all operations within a block to a specific group:
from advanced_alchemy.routing import use_bind_group
async with session_maker() as session:
repo = UserRepository(session=session)
# Route to analytics database
with use_bind_group("analytics"):
stats = await repo.count()
# Route to reporting group (load balanced if multiple engines)
with use_bind_group("reporting"):
report = await repo.list()
Explicit Parameter
All repository methods accept a bind_group parameter:
# Query directly from analytics group
users = await repo.list(bind_group="analytics")
# Count from reporting group
count = await repo.count(bind_group="reporting")
Context Managers¶
Use context managers for explicit control over routing:
Primary Context¶
Force operations to use the default (primary) group. This is an alias for use_bind_group("default"):
from advanced_alchemy.routing import primary_context
async with session_maker() as session:
repo = UserRepository(session=session)
# Force this read to use primary (e.g. for critical consistency)
with primary_context():
critical_user = await repo.get(user_id)
Replica Context¶
Force operations to use the read group. This is an alias for use_bind_group("read"):
from advanced_alchemy.routing import replica_context
async with session_maker() as session:
repo = UserRepository(session=session)
# Force read from replica (even if sticky-primary is active)
with replica_context():
users = await repo.list()
Temporarily Disable Routing¶
Disable routing to send all traffic to the default primary engine:
config = RoutingConfig(
engines={"default": ["..."], "read": ["..."]},
enabled=False, # All traffic to default group's first engine
)
Framework Integration¶
Routing automatically integrates with all supported frameworks.
Litestar¶
from litestar import Litestar
from litestar.plugins.sqlalchemy import SQLAlchemyAsyncConfig, SQLAlchemyPlugin
from advanced_alchemy.config.routing import RoutingConfig
config = SQLAlchemyAsyncConfig(
routing_config=RoutingConfig(
primary_connection_string="postgresql+asyncpg://primary:5432/db",
read_replicas=["postgresql+asyncpg://replica1:5432/db"],
),
)
app = Litestar(plugins=[SQLAlchemyPlugin(config=config)])
Routing context is automatically reset per request.
FastAPI¶
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from advanced_alchemy.extensions.fastapi import SQLAlchemyAsyncConfig, get_session
config = SQLAlchemyAsyncConfig(
routing_config=RoutingConfig(
primary_connection_string="postgresql+asyncpg://primary:5432/db",
read_replicas=["postgresql+asyncpg://replica1:5432/db"],
),
)
app = FastAPI()
@app.get("/users")
async def list_users(session: AsyncSession = Depends(get_session)):
repo = UserRepository(session=session)
return await repo.list() # Routes to replica
Flask¶
from flask import Flask
from advanced_alchemy.extensions.flask import SQLAlchemyExtension
from advanced_alchemy.config.routing import RoutingConfig
app = Flask(__name__)
app.config["SQLALCHEMY_ROUTING_CONFIG"] = RoutingConfig(
primary_connection_string="postgresql://primary:5432/db",
read_replicas=["postgresql://replica1:5432/db"],
)
db = SQLAlchemyExtension(app=app)
AWS Aurora / Cloud SQL Example¶
AWS Aurora and Google Cloud SQL provide automatic replica endpoints:
# AWS Aurora configuration
config = RoutingConfig(
primary_connection_string=(
"postgresql+asyncpg://user:pass@mydb-cluster.cluster-xxx.us-east-1.rds.amazonaws.com:5432/mydb"
),
read_replicas=[
# Aurora reader endpoint (load-balanced across replicas)
"postgresql+asyncpg://user:pass@mydb-cluster.cluster-ro-xxx.us-east-1.rds.amazonaws.com:5432/mydb",
],
)
# Google Cloud SQL configuration
config = RoutingConfig(
primary_connection_string="postgresql+asyncpg://user:pass@primary-ip:5432/mydb",
read_replicas=[
"postgresql+asyncpg://user:pass@replica1-ip:5432/mydb",
"postgresql+asyncpg://user:pass@replica2-ip:5432/mydb",
],
)
Best Practices¶
Use Sticky-After-Write: Keep
sticky_after_write=True(default) to avoid read-after-write inconsistencyMonitor Replica Lag: Ensure replicas stay close to primary (< 1 second lag)
Test Failover: Verify behavior when replicas are unavailable
Use Context Managers: Use
primary_context()for critical reads that must be up-to-dateConnection Pooling: Configure appropriate pool sizes for primary and replicas
Health Checks: Implement health checks to detect unhealthy replicas (future feature)
Troubleshooting¶
Reads Not Using Replicas¶
Check if sticky-after-write is active:
from advanced_alchemy.routing import stick_to_primary_var
# Check current state
if stick_to_primary_var.get():
print("Currently stuck to primary")
Reset routing context manually:
from advanced_alchemy.routing import reset_routing_context
reset_routing_context()
Stale Reads from Replicas¶
If replicas have significant lag, use primary_context() for critical reads:
from advanced_alchemy.routing import primary_context
# Force primary for latest data
with primary_context():
user = await repo.get(user_id)
Temporarily Disable Routing¶
For debugging, disable routing to send all traffic to primary:
config = RoutingConfig(
primary_connection_string="postgresql+asyncpg://...",
read_replicas=["..."],
enabled=False, # All to primary
)
See Also¶
routing - API Reference
Repositories - Repository Pattern
Services - Service Layer
asyncio - Async Configuration