Services¶
Services in Advanced Alchemy build on repositories to provide higher-level business logic, data transformation, and schema validation. While repositories handle raw database operations, services coordinate application rules and schema conversion.
Understanding Services¶
Services provide:
Business logic abstraction
Data transformation using Pydantic, Msgspec, or attrs models
Input validation and type-safe schema conversion
Complex operations involving multiple repositories
Consistent error handling
Automatic schema validation and transformation
Support for SQLAlchemy query results (Row types) and RowMapping objects
Note
The examples below define a minimal Post / Tag model inline so the service examples stay self-contained.
Basic Service Usage¶
Let’s build upon a blog example by creating services for posts:
import datetime
from typing import Optional
from collections.abc import Hashable
from pydantic import BaseModel, Field
from sqlalchemy import Column, ForeignKey, Table
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.elements import ColumnElement
from advanced_alchemy.base import BigIntAuditBase, orm_registry
from advanced_alchemy.mixins import SlugKey, UniqueMixin
from advanced_alchemy.repository import SQLAlchemyAsyncRepository
from advanced_alchemy.service import (
SQLAlchemyAsyncRepositoryService,
)
from advanced_alchemy.utils.text import slugify
blog_post_tag = Table(
"blog_service_post_tag",
orm_registry.metadata,
Column("post_id", ForeignKey("blog_service_post.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", ForeignKey("blog_service_tag.id", ondelete="CASCADE"), primary_key=True),
)
class BlogPost(BigIntAuditBase):
__tablename__ = "blog_service_post"
title: Mapped[str] = mapped_column(index=True)
content: Mapped[str]
published: Mapped[bool] = mapped_column(default=False)
tags: Mapped[list["BlogTag"]] = relationship(
secondary=blog_post_tag,
back_populates="posts",
lazy="selectin",
)
class BlogTag(BigIntAuditBase, SlugKey, UniqueMixin):
__tablename__ = "blog_service_tag"
name: Mapped[str] = mapped_column(unique=True, index=True)
posts: Mapped[list["BlogPost"]] = relationship(
secondary=blog_post_tag,
back_populates="tags",
viewonly=True,
)
@classmethod
def unique_hash(cls, name: str, slug: Optional[str] = None) -> Hashable:
return slugify(name)
@classmethod
def unique_filter(
cls,
name: str,
slug: Optional[str] = None,
) -> ColumnElement[bool]:
return cls.slug == slugify(name)
class BlogPostCreate(BaseModel):
title: str
content: str
tags: list[str] = Field(default_factory=list)
class BlogPostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
published: Optional[bool] = None
tags: Optional[list[str]] = None
class BlogPostResponse(BaseModel):
id: int
title: str
content: str
published: bool
created_at: datetime.datetime
updated_at: datetime.datetime
model_config = {"from_attributes": True}
class BlogPostService(SQLAlchemyAsyncRepositoryService[BlogPost]):
"""Post service."""
class Repo(SQLAlchemyAsyncRepository[BlogPost]):
model_type = BlogPost
repository_type = Repo
import datetime
from collections.abc import Hashable
from pydantic import BaseModel, Field
from sqlalchemy import Column, ForeignKey, Table
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.elements import ColumnElement
from advanced_alchemy.base import BigIntAuditBase, orm_registry
from advanced_alchemy.mixins import SlugKey, UniqueMixin
from advanced_alchemy.repository import SQLAlchemyAsyncRepository
from advanced_alchemy.service import (
SQLAlchemyAsyncRepositoryService,
)
from advanced_alchemy.utils.text import slugify
blog_post_tag = Table(
"blog_service_post_tag",
orm_registry.metadata,
Column("post_id", ForeignKey("blog_service_post.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", ForeignKey("blog_service_tag.id", ondelete="CASCADE"), primary_key=True),
)
class BlogPost(BigIntAuditBase):
__tablename__ = "blog_service_post"
title: Mapped[str] = mapped_column(index=True)
content: Mapped[str]
published: Mapped[bool] = mapped_column(default=False)
tags: Mapped[list["BlogTag"]] = relationship(
secondary=blog_post_tag,
back_populates="posts",
lazy="selectin",
)
class BlogTag(BigIntAuditBase, SlugKey, UniqueMixin):
__tablename__ = "blog_service_tag"
name: Mapped[str] = mapped_column(unique=True, index=True)
posts: Mapped[list["BlogPost"]] = relationship(
secondary=blog_post_tag,
back_populates="tags",
viewonly=True,
)
@classmethod
def unique_hash(cls, name: str, slug: str | None = None) -> Hashable:
return slugify(name)
@classmethod
def unique_filter(
cls,
name: str,
slug: str | None = None,
) -> ColumnElement[bool]:
return cls.slug == slugify(name)
class BlogPostCreate(BaseModel):
title: str
content: str
tags: list[str] = Field(default_factory=list)
class BlogPostUpdate(BaseModel):
title: str | None = None
content: str | None = None
published: bool | None = None
tags: list[str] | None = None
class BlogPostResponse(BaseModel):
id: int
title: str
content: str
published: bool
created_at: datetime.datetime
updated_at: datetime.datetime
model_config = {"from_attributes": True}
class BlogPostService(SQLAlchemyAsyncRepositoryService[BlogPost]):
"""Post service."""
class Repo(SQLAlchemyAsyncRepository[BlogPost]):
model_type = BlogPost
repository_type = Repo
Service Operations¶
Services provide high-level methods for common operations:
async def create_post(post_service: BlogPostService, data: BlogPostCreate) -> BlogPostResponse:
post = await post_service.create(data=data, auto_commit=True)
return post_service.to_schema(post, schema_type=BlogPostResponse)
async def update_post(
post_service: BlogPostService,
post_id: int,
data: BlogPostUpdate,
) -> BlogPostResponse:
post = await post_service.update(data=data, item_id=post_id, auto_commit=True)
return post_service.to_schema(post, schema_type=BlogPostResponse)
Added in version 1.9.0.
Advanced Alchemy’s service layer automatically handles recursive model creation from nested dictionaries. When you pass a dictionary containing nested dictionaries that match a model’s relationships, the service will instantiate the related models.
from typing import Any
async def create_user_with_profile(user_service: Any) -> Any:
user_data = {
"username": "cody",
"email": "cody@litestar.dev",
"profile": {
"bio": "Software Engineer",
"twitter": "@cofin",
},
}
return await user_service.create(data=user_data)
Row Locking (FOR UPDATE)¶
Added in version 1.9.0.
Service retrieval methods like get support the with_for_update parameter, which is passed through to the underlying repository.
from typing import Any
async def get_user_for_update(user_service: Any, user_id: Any) -> Any:
return await user_service.get(item_id=user_id, with_for_update=True)
Composite Primary Keys¶
Services fully support models with composite primary keys using the same formats as repositories.
Pass primary key values as tuples or dictionaries when using get, update, or delete methods:
from typing import Any
from collections.abc import Sequence
async def update_user_role_permissions(user_role_service: Any, user_id: int, role_id: int) -> Any:
_current = await user_role_service.get((user_id, role_id))
return await user_role_service.update(
data={"permissions": "admin"},
item_id={"user_id": user_id, "role_id": role_id},
)
async def delete_user_roles(user_role_service: Any) -> Sequence[Any]:
return await user_role_service.delete_many([(1, 5), (1, 6), (2, 5)])
See Composite Primary Keys in the Repositories documentation for more details on supported formats.
Complex Operations¶
Services can handle complex business logic involving multiple models. The code below shows a service coordinating posts and tags.
class TaggedBlogPostService(SQLAlchemyAsyncRepositoryService[BlogPost]):
"""Post service for handling post operations with tag management."""
class Repo(SQLAlchemyAsyncRepository[BlogPost]):
model_type = BlogPost
loader_options = [BlogPost.tags]
repository_type = Repo
match_fields = ["title"]
async def to_model_on_create(self, data: "ModelDictT[BlogPost]") -> "ModelDictT[BlogPost]":
data = schema_dump(data)
tags_added = data.pop("tags", [])
post = await super().to_model(data)
if tags_added:
post.tags.extend(
[
await BlogTag.as_unique_async(self.repository.session, name=tag, slug=slugify(tag))
for tag in tags_added
],
)
return post
async def to_model_on_update(self, data: "ModelDictT[BlogPost]") -> "ModelDictT[BlogPost]":
data = schema_dump(data)
tags_updated = data.pop("tags", [])
post = await super().to_model(data)
if tags_updated is not None:
existing_tags = [tag.name for tag in post.tags]
tags_to_remove = [tag for tag in post.tags if tag.name not in tags_updated]
tags_to_add = [tag for tag in tags_updated if tag not in existing_tags]
for tag_to_remove in tags_to_remove:
post.tags.remove(tag_to_remove)
post.tags.extend(
[
await BlogTag.as_unique_async(self.repository.session, name=tag, slug=slugify(tag))
for tag in tags_to_add
],
)
return post
Working with Slugs¶
Services can automatically generate URL-friendly slugs using the SQLAlchemyAsyncSlugRepository.
Here’s an example service for managing tags with automatic slug generation:
class BlogTagService(SQLAlchemyAsyncRepositoryService[BlogTag]):
"""Tag service with automatic slug generation."""
class Repo(SQLAlchemyAsyncSlugRepository[BlogTag]):
model_type = BlogTag
repository_type = Repo
match_fields = ["name"]
async def to_model_on_create(self, data: "ModelDictT[BlogTag]") -> "ModelDictT[BlogTag]":
data = schema_dump(data)
if is_dict_without_field(data, "slug") and is_dict_with_field(data, "name"):
data["slug"] = await self.repository.get_available_slug(data["name"])
return data
async def to_model_on_update(self, data: "ModelDictT[BlogTag]") -> "ModelDictT[BlogTag]":
data = schema_dump(data)
if is_dict_without_field(data, "slug") and is_dict_with_field(data, "name"):
data["slug"] = await self.repository.get_available_slug(data["name"])
return data
async def to_model_on_upsert(self, data: "ModelDictT[BlogTag]") -> "ModelDictT[BlogTag]":
data = schema_dump(data)
if is_dict_without_field(data, "slug") and is_dict_with_field(data, "name"):
data["slug"] = await self.repository.get_available_slug(data["name"])
return data
Schema Integration¶
Advanced Alchemy services support multiple schema libraries for data transformation and validation:
Pydantic Models¶
class BlogPostSchema(BaseModel):
id: int
title: str
content: str
published: bool
model_config = {"from_attributes": True}
def to_pydantic_schema(post_service: BlogPostService, post_model: BlogPost) -> BlogPostSchema:
return post_service.to_schema(post_model, schema_type=BlogPostSchema)
Msgspec Structs¶
try:
from msgspec import Struct
except ModuleNotFoundError: # pragma: no cover - optional dependency in docs examples
class Struct: # type: ignore[no-redef]
pass
class BlogPostStruct(Struct):
id: int
title: str
content: str
published: bool
def to_msgspec_schema(post_service: BlogPostService, post_model: BlogPost) -> BlogPostStruct:
return post_service.to_schema(post_model, schema_type=BlogPostStruct)
Attrs Classes¶
try:
from attrs import define
except ModuleNotFoundError: # pragma: no cover - optional dependency in docs examples
def define(cls): # type: ignore[misc]
return cls
@define
class BlogPostAttrs:
id: int
title: str
content: str
published: bool
def to_attrs_schema(post_service: BlogPostService, post_model: BlogPost) -> BlogPostAttrs:
return post_service.to_schema(post_model, schema_type=BlogPostAttrs)
Note
Enhanced attrs Support with cattrs: When both attrs and cattrs are installed,
Advanced Alchemy automatically uses cattrs.structure() and cattrs.unstructure()
for improved performance and type-aware serialization. This provides better handling of
complex types, nested structures, and custom converters.
Framework Integration¶
Services integrate seamlessly with both Litestar and FastAPI.