Source code for advanced_alchemy.utils.serialization

"""Configurable JSON serialization with Protocol-based architecture.

This module provides a flexible serialization system that supports:
- Custom type encoders via the ``type_encoders`` parameter
- Multiple backend support (msgspec, orjson, standard library)
- Thread-safe caching of serializer instances
- Integration with Litestar's type_encoders pattern

Example:
    Basic usage with default encoders::

        >>> from advanced_alchemy.utils.serialization import encode_json, decode_json
        >>> import datetime
        >>> encode_json({"date": datetime.date.today()})
        '{"date":"2025-12-17"}'

    Custom type encoders::

        >>> from decimal import Decimal
        >>> class Money:
        ...     def __init__(self, amount: Decimal):
        ...         self.amount = amount
        >>> encode_json(
        ...     {"price": Money(Decimal("19.99"))},
        ...     type_encoders={Money: lambda m: str(m.amount)}
        ... )
        '{"price":"19.99"}'
"""

import datetime
import enum
import json
import threading
from abc import ABC, abstractmethod
from collections.abc import Mapping
from dataclasses import MISSING as DATACLASS_MISSING
from dataclasses import dataclass
from dataclasses import fields as dataclass_fields
from decimal import Decimal
from functools import lru_cache
from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
from pathlib import Path, PurePath
from typing import (
    TYPE_CHECKING,
    Annotated,
    Any,
    Callable,
    Final,
    Literal,
    Optional,
    Protocol,
    TypeVar,
    Union,
    cast,
    overload,
    runtime_checkable,
)
from uuid import UUID

from sqlalchemy import RowMapping
from typing_extensions import TypeAlias, TypeGuard

from advanced_alchemy.typing import (
    ATTRS_INSTALLED,
    CATTRS_INSTALLED,
    LITESTAR_INSTALLED,
    MSGSPEC_INSTALLED,
    NUMPY_INSTALLED,
    ORJSON_INSTALLED,
    PYDANTIC_INSTALLED,
    SQLMODEL_INSTALLED,
    UNSET,
    AttrsInstance,
    AttrsLike,
    BaseModel,
    BaseModelLike,
    DTOData,
    DTODataLike,
    FailFast,
    Struct,
    StructLike,
    attrs_nothing,
    convert,
)
from advanced_alchemy.typing import attrs_asdict as asdict
from advanced_alchemy.typing import attrs_fields as fields
from advanced_alchemy.typing import attrs_has as has
from advanced_alchemy.typing import cattrs_structure as structure
from advanced_alchemy.typing import cattrs_unstructure as unstructure
from advanced_alchemy.utils.dataclass import Empty as DataclassEmpty
from advanced_alchemy.utils.dataclass import is_dataclass_instance

if TYPE_CHECKING:
    from collections.abc import Sequence

    from sqlalchemy.engine.row import Row

    from advanced_alchemy.filters import StatementFilter
    from advanced_alchemy.repository.typing import ModelT

__all__ = (
    "ATTRS_INSTALLED",
    "CATTRS_INSTALLED",
    "DEFAULT_TYPE_ENCODERS",
    "MSGSPEC_INSTALLED",
    "PYDANTIC_INSTALLED",
    "PYDANTIC_USE_FAILFAST",
    "UNSET",
    "AttrsInstance",
    "BaseModel",
    "BulkModelDictT",
    "FilterTypeT",
    "JSONSerializer",
    "ModelDTOT",
    "ModelDictListT",
    "ModelDictT",
    "MsgspecSerializer",
    "OrjsonSerializer",
    "PydanticOrMsgspecT",
    "SchemaDumpConfig",
    "StandardLibSerializer",
    "Struct",
    "SupportedSchemaModel",
    "TypeEncodersMap",
    "asdict",
    "attrs_nothing",
    "convert",
    "convert_date_to_iso",
    "convert_datetime_to_gmt_iso",
    "decode_complex_type",
    "decode_json",
    "encode_complex_type",
    "encode_json",
    "fields",
    "get_attrs_fields",
    "get_serializer",
    "get_type_adapter",
    "has",
    "has_dict_attribute",
    "is_attrs_instance",
    "is_attrs_instance_with_field",
    "is_attrs_instance_without_field",
    "is_attrs_schema",
    "is_dataclass",
    "is_dataclass_with_field",
    "is_dataclass_without_field",
    "is_dict",
    "is_dict_with_field",
    "is_dict_without_field",
    "is_dto_data",
    "is_msgspec_struct",
    "is_msgspec_struct_with_field",
    "is_msgspec_struct_without_field",
    "is_pydantic_model",
    "is_pydantic_model_with_field",
    "is_pydantic_model_without_field",
    "is_row_mapping",
    "is_schema",
    "is_schema_or_dict",
    "is_schema_or_dict_with_field",
    "is_schema_or_dict_without_field",
    "is_schema_with_field",
    "is_schema_without_field",
    "is_sqlmodel_table_model",
    "schema_dump",
    "structure",
    "unstructure",
)

# Type aliases
TypeEncodersMap = Mapping[type, Callable[[Any], Any]]
"""Mapping of types to encoder functions for custom serialization."""

T = TypeVar("T")

PYDANTIC_USE_FAILFAST = False  # leave permanently disabled for now

FilterTypeT = TypeVar("FilterTypeT", bound="StatementFilter")
"""Type variable for filter types."""

SupportedSchemaModel: TypeAlias = Union[StructLike, BaseModelLike, AttrsLike]
"""Type alias for objects that support schema conversion methods (model_dump, asdict, etc.)."""

ModelDTOT = TypeVar("ModelDTOT", bound="Union[SupportedSchemaModel, Any]")
"""Type variable for model DTOs."""

PydanticOrMsgspecT = SupportedSchemaModel
"""Type alias for supported schema models."""

ModelDictT: TypeAlias = "Union[dict[str, Any], ModelT, SupportedSchemaModel, DTODataLike[ModelT], Any]"
"""Type alias for model dictionaries."""

ModelDictListT: TypeAlias = "Sequence[Union[dict[str, Any], ModelT, SupportedSchemaModel, Any]]"
"""Type alias for model dictionary lists."""

BulkModelDictT: TypeAlias = (
    "Union[Sequence[Union[dict[str, Any], ModelT, SupportedSchemaModel, Any]], DTODataLike[list[ModelT]]]"
)
"""Type alias for bulk model dictionaries."""


class _HasDictAttribute(Protocol):
    __dict__: dict[str, Any]


[docs] @dataclass(frozen=True) class SchemaDumpConfig: """Configuration for dumping schema-like service input data.""" exclude_unset: bool = True exclude_none: bool = False exclude_defaults: bool = False exclude_sentinels: bool = True
DEFAULT_SCHEMA_DUMP_CONFIG: Final = SchemaDumpConfig() def _get_schema_dump_config(exclude_unset: bool, config: "Optional[SchemaDumpConfig]") -> SchemaDumpConfig: """Resolve backwards-compatible schema dump configuration.""" if config is not None: return config if exclude_unset: return DEFAULT_SCHEMA_DUMP_CONFIG return SchemaDumpConfig(exclude_unset=False, exclude_sentinels=False) def _get_pydantic_missing_sentinel() -> Any: """Return Pydantic's experimental MISSING sentinel when available.""" if not PYDANTIC_INSTALLED: return None try: from pydantic.experimental.missing_sentinel import MISSING except ImportError: # pragma: no cover - depends on installed Pydantic version return None return MISSING # ============================================================================ # Helper functions # ============================================================================ def convert_datetime_to_gmt_iso(dt: datetime.datetime) -> str: """Convert datetime to ISO 8601 format with UTC timezone. If the datetime is naive (no timezone), UTC is assumed. The ``+00:00`` suffix is replaced with ``Z`` for consistency. Args: dt: The datetime to convert. Returns: ISO 8601 formatted datetime string with ``Z`` suffix for UTC. Example: >>> import datetime >>> dt = datetime.datetime(2025, 12, 17, 10, 30, 0) >>> convert_datetime_to_gmt_iso(dt) '2025-12-17T10:30:00Z' """ if not dt.tzinfo: dt = dt.replace(tzinfo=datetime.timezone.utc) return dt.isoformat().replace("+00:00", "Z") def convert_date_to_iso(dt: datetime.date) -> str: """Convert date to ISO 8601 format. Args: dt: The date to convert. Returns: ISO 8601 formatted date string (YYYY-MM-DD). Example: >>> import datetime >>> convert_date_to_iso(datetime.date(2025, 12, 17)) '2025-12-17' """ return dt.isoformat() @lru_cache(typed=True) def get_type_adapter(f: "type[T]") -> Any: """Caches and returns a pydantic type adapter.""" from advanced_alchemy.typing import TypeAdapter if PYDANTIC_USE_FAILFAST: return TypeAdapter(Annotated[f, FailFast()]) # pyright: ignore return TypeAdapter(f) @lru_cache(maxsize=128, typed=True) def get_attrs_fields(cls: Any) -> "tuple[Any, ...]": """Caches and returns attrs fields for a given attrs class.""" if ATTRS_INSTALLED: return fields(cls) # type: ignore[no-any-return] return ()
[docs] def is_dto_data(v: Any) -> TypeGuard[DTODataLike[Any]]: """Check if a value is a Litestar DTOData object.""" return LITESTAR_INSTALLED and isinstance(v, DTOData)
[docs] def is_pydantic_model(v: Any) -> TypeGuard[BaseModelLike]: """Check if a value is a pydantic model.""" if not PYDANTIC_INSTALLED: return False if isinstance(v, type): try: return issubclass(v, BaseModel) except TypeError: return False return isinstance(v, BaseModel)
[docs] def is_sqlmodel_table_model(v: Any) -> bool: """Check if a value is a SQLModel table model instance or class.""" if not SQLMODEL_INSTALLED: return False if isinstance(v, type): try: return issubclass(v, BaseModel) and hasattr(v, "__mapper__") except TypeError: return False return isinstance(v, BaseModel) and hasattr(v, "__mapper__")
[docs] def is_msgspec_struct(v: Any) -> TypeGuard[StructLike]: """Check if a value is a msgspec struct.""" return MSGSPEC_INSTALLED and isinstance(v, Struct)
[docs] def is_attrs_instance(obj: Any) -> TypeGuard[AttrsLike]: """Check if a value is an attrs class instance.""" return ATTRS_INSTALLED and has(obj.__class__)
[docs] def is_attrs_schema(cls: Any) -> TypeGuard["type[AttrsLike]"]: """Check if a class type is an attrs schema.""" return ATTRS_INSTALLED and has(cls)
def is_dataclass(obj: Any) -> TypeGuard[Any]: """Check if an object is a dataclass.""" return hasattr(obj, "__dataclass_fields__") def is_dataclass_with_field(obj: Any, field_name: str) -> TypeGuard[object]: """Check if an object is a dataclass and has a specific field.""" return is_dataclass(obj) and hasattr(obj, field_name) def is_dataclass_without_field(obj: Any, field_name: str) -> TypeGuard[object]: """Check if an object is a dataclass and does not have a specific field.""" return is_dataclass(obj) and not hasattr(obj, field_name)
[docs] def is_attrs_instance_with_field(v: Any, field_name: str) -> TypeGuard[AttrsLike]: """Check if an attrs instance has a specific field.""" return is_attrs_instance(v) and hasattr(v, field_name)
[docs] def is_attrs_instance_without_field(v: Any, field_name: str) -> TypeGuard[AttrsLike]: """Check if an attrs instance does not have a specific field.""" return is_attrs_instance(v) and not hasattr(v, field_name)
[docs] def is_dict(v: Any) -> TypeGuard[dict[str, Any]]: """Check if a value is a dictionary.""" return isinstance(v, dict)
def has_dict_attribute(obj: Any) -> "TypeGuard[_HasDictAttribute]": """Check if an object has a __dict__ attribute.""" return obj is not None and hasattr(obj, "__dict__") def is_row_mapping(v: Any) -> TypeGuard["RowMapping"]: """Check if a value is a SQLAlchemy RowMapping.""" return isinstance(v, RowMapping)
[docs] def is_dict_with_field(v: Any, field_name: str) -> TypeGuard[dict[str, Any]]: """Check if a dictionary has a specific field.""" return is_dict(v) and field_name in v
[docs] def is_dict_without_field(v: Any, field_name: str) -> TypeGuard[dict[str, Any]]: """Check if a dictionary does not have a specific field.""" return is_dict(v) and field_name not in v
[docs] def is_pydantic_model_with_field(v: Any, field_name: str) -> TypeGuard[BaseModelLike]: """Check if a pydantic model has a specific field.""" return is_pydantic_model(v) and hasattr(v, field_name)
[docs] def is_pydantic_model_without_field(v: Any, field_name: str) -> TypeGuard[BaseModelLike]: """Check if a pydantic model does not have a specific field.""" return is_pydantic_model(v) and not hasattr(v, field_name)
[docs] def is_msgspec_struct_with_field(v: Any, field_name: str) -> TypeGuard[StructLike]: """Check if a msgspec struct has a specific field.""" return is_msgspec_struct(v) and hasattr(v, field_name)
[docs] def is_msgspec_struct_without_field(v: Any, field_name: str) -> "TypeGuard[StructLike]": """Check if a msgspec struct does not have a specific field.""" return is_msgspec_struct(v) and not hasattr(v, field_name)
[docs] def is_schema(v: Any) -> "TypeGuard[SupportedSchemaModel]": """Check if a value is a msgspec Struct, Pydantic model, or attrs instance.""" if is_sqlmodel_table_model(v): return False return is_msgspec_struct(v) or is_pydantic_model(v) or is_attrs_instance(v)
[docs] def is_schema_or_dict(v: Any) -> "TypeGuard[Union[SupportedSchemaModel, dict[str, Any]]]": """Check if a value is a msgspec Struct, Pydantic model, attrs class, or dict.""" return is_schema(v) or is_dict(v)
[docs] def is_schema_with_field(v: Any, field_name: str) -> "TypeGuard[SupportedSchemaModel]": """Check if a value is a msgspec Struct, Pydantic model, or attrs instance with a specific field.""" if is_sqlmodel_table_model(v): return False return ( is_msgspec_struct_with_field(v, field_name) or is_pydantic_model_with_field(v, field_name) or is_attrs_instance_with_field(v, field_name) )
[docs] def is_schema_without_field(v: Any, field_name: str) -> "TypeGuard[SupportedSchemaModel]": """Check if a value is a msgspec Struct, Pydantic model, or attrs instance without a specific field.""" return is_schema(v) and not hasattr(v, field_name)
[docs] def is_schema_or_dict_with_field(v: Any, field_name: str) -> "TypeGuard[Union[SupportedSchemaModel, dict[str, Any]]]": """Check if a value is a msgspec Struct, Pydantic model, attrs instance, or dict with a specific field.""" return is_schema_with_field(v, field_name) or is_dict_with_field(v, field_name)
[docs] def is_schema_or_dict_without_field( v: Any, field_name: str ) -> "TypeGuard[Union[SupportedSchemaModel, dict[str, Any]]]": """Check if a value is a msgspec Struct, Pydantic model, attrs instance, or dict without a specific field.""" return is_schema_or_dict(v) and not is_schema_or_dict_with_field(v, field_name)
def _dump_pydantic_model(data: BaseModelLike, dump_config: SchemaDumpConfig) -> "dict[str, Any]": """Dump a Pydantic model according to the schema dump config.""" dumped = data.model_dump( exclude_unset=dump_config.exclude_unset, exclude_none=dump_config.exclude_none, exclude_defaults=dump_config.exclude_defaults, ) if not dump_config.exclude_sentinels and (missing := _get_pydantic_missing_sentinel()) is not None: for field_name in getattr(data.__class__, "model_fields", {}): if getattr(data, field_name, None) is missing: dumped[field_name] = missing return dumped def _dump_msgspec_struct(data: StructLike, dump_config: SchemaDumpConfig) -> "dict[str, Any]": """Dump a msgspec struct according to the schema dump config.""" fields = data.__struct_fields__ defaults = getattr(data, "__struct_defaults__", ()) default_fields = fields[-len(defaults) :] if defaults else () default_values = dict(zip(default_fields, defaults)) dumped: dict[str, Any] = {} for field_name in fields: if not hasattr(data, field_name): continue value = getattr(data, field_name) if dump_config.exclude_sentinels and value is UNSET: continue if dump_config.exclude_none and value is None: continue if dump_config.exclude_defaults and field_name in default_values and value == default_values[field_name]: continue dumped[field_name] = value return dumped def _dump_attrs_instance(data: AttrsLike, dump_config: SchemaDumpConfig) -> "dict[str, Any]": """Dump an attrs instance according to the schema dump config.""" if dump_config.exclude_sentinels or dump_config.exclude_none: # Filter out attrs.NOTHING values for partial updates. def filter_unset_attrs(attr: Any, value: Any) -> bool: if dump_config.exclude_sentinels and value is attrs_nothing: return False if dump_config.exclude_none and value is None: return False return not (dump_config.exclude_defaults and attr.default is not attrs_nothing and value == attr.default) return asdict(data, filter=filter_unset_attrs) if CATTRS_INSTALLED: return unstructure(data) # type: ignore[no-any-return] return asdict(data) def _dump_dataclass_instance(data: Any, dump_config: SchemaDumpConfig) -> "dict[str, Any]": """Dump a dataclass instance according to the schema dump config.""" dumped: dict[str, Any] = {} for field in dataclass_fields(data): value = getattr(data, field.name) if dump_config.exclude_sentinels and value is DataclassEmpty: continue if dump_config.exclude_none and value is None: continue if dump_config.exclude_defaults and field.default is not DATACLASS_MISSING and value == field.default: continue if is_dataclass_instance(value): value = _dump_dataclass_instance(value, dump_config) dumped[field.name] = value return dumped def _dump_schema_model(data: Any, dump_config: SchemaDumpConfig) -> "Optional[dict[str, Any]]": """Dump a supported schema-like object according to the schema dump config.""" if is_pydantic_model(data): return _dump_pydantic_model(data, dump_config) if is_msgspec_struct(data): return _dump_msgspec_struct(data, dump_config) if is_attrs_instance(data): return _dump_attrs_instance(data, dump_config) if is_dataclass(data): return _dump_dataclass_instance(data, dump_config) return None @overload def schema_dump( data: "RowMapping", exclude_unset: bool = True, *, config: "Optional[SchemaDumpConfig]" = None, ) -> "dict[str, Any]": ... @overload def schema_dump( data: "Row[Any]", exclude_unset: bool = True, *, config: "Optional[SchemaDumpConfig]" = None, ) -> "dict[str, Any]": ... @overload def schema_dump( data: "DTODataLike[Any]", exclude_unset: bool = True, *, config: "Optional[SchemaDumpConfig]" = None, ) -> "dict[str, Any]": ... @overload def schema_dump( # pyright: ignore[reportOverlappingOverload] data: "ModelT", exclude_unset: bool = True, *, config: "Optional[SchemaDumpConfig]" = None, ) -> "ModelT": ... @overload def schema_dump( data: Any, exclude_unset: bool = True, *, config: "Optional[SchemaDumpConfig]" = None, ) -> "dict[str, Any]": ...
[docs] def schema_dump( # noqa: PLR0911 data: "Union[dict[str, Any], ModelT, SupportedSchemaModel, DTODataLike[ModelT], RowMapping, Row[Any]]", exclude_unset: bool = True, *, config: "Optional[SchemaDumpConfig]" = None, ) -> "Union[dict[str, Any], ModelT]": """Dump a data object to a dictionary.""" dump_config = _get_schema_dump_config(exclude_unset=exclude_unset, config=config) if is_dict(data): return data if is_row_mapping(data): return dict(data) if is_sqlmodel_table_model(data): return cast("ModelT", data) if (schema_data := _dump_schema_model(data, dump_config)) is not None: return schema_data if is_dto_data(data): return cast("dict[str, Any]", data.as_builtins()) if has_dict_attribute(data): return data.__dict__ return cast("ModelT", data)
# ============================================================================ # Default type encoders # ============================================================================ def _build_default_type_encoders() -> dict[type, Callable[[Any], Any]]: """Build the default type encoders dictionary. This function constructs the default encoders, including optional encoders for numpy and pydantic if those packages are installed. Returns: Dictionary mapping types to encoder functions. """ encoders: dict[type, Callable[[Any], Any]] = { # Datetime types datetime.datetime: convert_datetime_to_gmt_iso, datetime.date: convert_date_to_iso, datetime.time: lambda v: v.isoformat(), datetime.timedelta: lambda v: v.total_seconds(), # Numeric types Decimal: str, # Preserve precision as string # UUID and path types UUID: str, Path: str, PurePath: str, # Network types IPv4Address: str, IPv4Interface: str, IPv4Network: str, IPv6Address: str, IPv6Interface: str, IPv6Network: str, # Collection types frozenset: list, set: list, # Bytes bytes: lambda v: v.decode("utf-8", errors="replace"), # Enum (use .value to get the underlying value) enum.Enum: lambda v: v.value, } # Optional NumPy support if NUMPY_INSTALLED: import numpy as np encoders[np.ndarray] = lambda v: v.tolist() encoders[np.integer] = int encoders[np.floating] = float encoders[np.bool_] = bool # Optional Pydantic support if PYDANTIC_INSTALLED: from pydantic import BaseModel encoders[BaseModel] = lambda v: v.model_dump(mode="json") return encoders DEFAULT_TYPE_ENCODERS: Final[dict[type, Callable[[Any], Any]]] = _build_default_type_encoders() """Default type encoders for common Python types. These encoders handle serialization of types not natively supported by JSON serializers. Users can override these by passing custom ``type_encoders`` to serializer functions. Supported types: - ``datetime.datetime``: ISO 8601 with UTC timezone (Z suffix) - ``datetime.date``: ISO 8601 date format - ``datetime.time``: ISO 8601 time format - ``datetime.timedelta``: Total seconds as float - ``Decimal``: String representation (preserves precision) - ``UUID``: String representation - ``Path``, ``PurePath``: String representation - ``IPv4Address``, ``IPv6Address``, etc.: String representation - ``set``, ``frozenset``: Converted to list - ``bytes``: UTF-8 decoded string - ``enum.Enum``: Value of the enum member - ``numpy.ndarray``: Converted to list (if numpy installed) - ``pydantic.BaseModel``: model_dump with mode="json" (if pydantic installed) """ # ============================================================================ # Protocol and base classes # ============================================================================ @runtime_checkable class JSONSerializer(Protocol): """Protocol defining the JSON serialization interface. Implement this protocol to create custom serializers that integrate with Advanced Alchemy's serialization system. Example: >>> class MySerializer: ... def encode( ... self, data: Any, *, as_bytes: bool = False ... ) -> "Union[str, bytes]": ... # Custom encoding logic ... pass ... ... def decode( ... self, ... data: "Union[str, bytes]", ... *, ... decode_bytes: bool = True, ... ) -> Any: ... # Custom decoding logic ... pass """ @overload def encode(self, data: Any, *, as_bytes: Literal[False] = ...) -> str: ... @overload def encode(self, data: Any, *, as_bytes: Literal[True]) -> bytes: ... @overload def encode(self, data: Any, *, as_bytes: bool) -> "Union[str, bytes]": ... def encode(self, data: Any, *, as_bytes: bool = False) -> "Union[str, bytes]": """Encode data to JSON. Args: data: Data to encode. as_bytes: If True, return bytes; otherwise return str. Returns: JSON string or bytes representation. """ ... def decode(self, data: "Union[str, bytes]", *, decode_bytes: bool = True) -> Any: """Decode JSON to Python object. Args: data: JSON string or bytes to decode. decode_bytes: If True, decode bytes input; otherwise return as-is. Returns: Decoded Python object. """ ... class BaseJSONSerializer(ABC): """Abstract base class for JSON serializers. Provides common functionality for serializer implementations including type encoder merging and enc_hook creation. """ __slots__ = ("_custom_type_encoders", "_type_encoders") def __init__(self, type_encoders: "Optional[TypeEncodersMap]" = None) -> None: """Initialize serializer with optional custom type encoders. Args: type_encoders: Custom type encoders to merge with defaults. User-provided encoders take precedence over defaults. """ self._custom_type_encoders: dict[type, Callable[[Any], Any]] = dict(type_encoders or {}) self._type_encoders: dict[type, Callable[[Any], Any]] = { **DEFAULT_TYPE_ENCODERS, **(type_encoders or {}), } @staticmethod def _get_type_encoder( value: Any, type_encoders: Mapping[type, Callable[[Any], Any]], ) -> "Optional[Callable[[Any], Any]]": """Return the encoder matching ``value`` by exact type or MRO.""" for base in value.__class__.__mro__[:-1]: encoder = type_encoders.get(base) if encoder is not None: return encoder return None def _encode_mapping_key_with_custom_type_encoder(self, key: Any) -> Any: encoder = self._get_type_encoder(key, self._custom_type_encoders) return encoder(key) if encoder is not None else key def _apply_custom_type_encoders_to_mapping(self, value: Mapping[object, object]) -> dict[Any, Any]: return { self._encode_mapping_key_with_custom_type_encoder(key): self._apply_custom_type_encoders(item) for key, item in value.items() } def _apply_custom_type_encoders_to_list(self, value: list[object]) -> list[Any]: return [self._apply_custom_type_encoders(item) for item in value] def _apply_custom_type_encoders_to_tuple(self, value: tuple[object, ...]) -> tuple[Any, ...]: return tuple(self._apply_custom_type_encoders(item) for item in value) def _apply_custom_type_encoders_to_set(self, value: Union[set[object], frozenset[object]]) -> list[Any]: return [self._apply_custom_type_encoders(item) for item in value] def _apply_custom_type_encoders(self, value: Any) -> Any: """Apply user-provided type encoders before backend-native encoding.""" encoder = self._get_type_encoder(value, self._custom_type_encoders) if encoder is not None: return encoder(value) if isinstance(value, Mapping): return self._apply_custom_type_encoders_to_mapping(cast("Mapping[object, object]", value)) if isinstance(value, list): return self._apply_custom_type_encoders_to_list(cast("list[object]", value)) if isinstance(value, tuple): return self._apply_custom_type_encoders_to_tuple(cast("tuple[object, ...]", value)) if isinstance(value, (set, frozenset)): return self._apply_custom_type_encoders_to_set(cast("Union[set[object], frozenset[object]]", value)) return value def _prepare_data_for_encode(self, data: Any) -> Any: if not self._custom_type_encoders: return data return self._apply_custom_type_encoders(data) def _create_enc_hook(self) -> "Callable[[Any], Any]": """Create an encoding hook function from type_encoders. The hook walks the MRO of the value's type to find a matching encoder, allowing encoders for parent classes to handle subclasses. Returns: Encoding hook function suitable for msgspec/orjson. """ type_encoders = self._type_encoders def enc_hook(value: Any) -> Any: encoder = self._get_type_encoder(value, type_encoders) if encoder is not None: return encoder(value) # Fallback: try string conversion try: return str(value) except Exception as exc: msg = f"Cannot serialize {type(value).__name__}" raise TypeError(msg) from exc return enc_hook @overload @abstractmethod def encode(self, data: Any, *, as_bytes: Literal[False] = ...) -> str: ... @overload @abstractmethod def encode(self, data: Any, *, as_bytes: Literal[True]) -> bytes: ... @overload @abstractmethod def encode(self, data: Any, *, as_bytes: bool) -> "Union[str, bytes]": ... @abstractmethod def encode(self, data: Any, *, as_bytes: bool = False) -> "Union[str, bytes]": """Encode data to JSON.""" ... @abstractmethod def decode(self, data: "Union[str, bytes]", *, decode_bytes: bool = True) -> Any: """Decode JSON to Python object.""" ... # ============================================================================ # Serializer implementations # ============================================================================ class MsgspecSerializer(BaseJSONSerializer): """High-performance JSON serializer using msgspec. This is the preferred serializer when msgspec is available, offering the best performance for JSON encoding/decoding. Example: >>> serializer = MsgspecSerializer() >>> serializer.encode({"key": "value"}) '{"key":"value"}' >>> # With custom type encoders >>> serializer = MsgspecSerializer(type_encoders={set: sorted}) >>> serializer.encode({"items": {3, 1, 2}}) '{"items":[1,2,3]}' """ __slots__ = ("_decoder", "_enc_hook", "_encoder") def __init__( self, type_encoders: "Optional[TypeEncodersMap]" = None, enc_hook: "Optional[Callable[[Any], Any]]" = None, ) -> None: """Initialize msgspec serializer. Args: type_encoders: Custom type encoders to merge with defaults. enc_hook: Optional custom encoding hook. If provided, type_encoders is ignored and this hook is used directly for complete control. """ from msgspec.json import Decoder, Encoder super().__init__(None if enc_hook is not None else type_encoders) self._enc_hook = enc_hook if enc_hook is not None else self._create_enc_hook() self._encoder: Final = Encoder(enc_hook=self._enc_hook) self._decoder: Final = Decoder() @overload def encode(self, data: Any, *, as_bytes: Literal[False] = ...) -> str: ... @overload def encode(self, data: Any, *, as_bytes: Literal[True]) -> bytes: ... @overload def encode(self, data: Any, *, as_bytes: bool) -> "Union[str, bytes]": ... def encode(self, data: Any, *, as_bytes: bool = False) -> "Union[str, bytes]": """Encode data to JSON using msgspec. Args: data: Data to encode. as_bytes: If True, return bytes; otherwise return UTF-8 string. Returns: JSON representation as string or bytes. """ result = self._encoder.encode(self._prepare_data_for_encode(data)) return result if as_bytes else result.decode("utf-8") def decode(self, data: "Union[str, bytes]", *, decode_bytes: bool = True) -> Any: """Decode JSON using msgspec. Args: data: JSON string or bytes to decode. decode_bytes: If True, decode bytes input; otherwise return as-is. Returns: Decoded Python object. """ if isinstance(data, str): data = data.encode("utf-8") elif not decode_bytes: return data return self._decoder.decode(data) class OrjsonSerializer(BaseJSONSerializer): """High-performance JSON serializer using orjson. Provides excellent performance with native support for datetime, UUID, and numpy serialization. Example: >>> serializer = OrjsonSerializer() >>> serializer.encode({"key": "value"}) '{"key":"value"}' """ __slots__ = ("_enc_hook",) def __init__(self, type_encoders: "Optional[TypeEncodersMap]" = None) -> None: """Initialize orjson serializer. Args: type_encoders: Custom type encoders to merge with defaults. """ super().__init__(type_encoders) self._enc_hook = self._create_enc_hook() @overload def encode(self, data: Any, *, as_bytes: Literal[False] = ...) -> str: ... @overload def encode(self, data: Any, *, as_bytes: Literal[True]) -> bytes: ... @overload def encode(self, data: Any, *, as_bytes: bool) -> "Union[str, bytes]": ... def encode(self, data: Any, *, as_bytes: bool = False) -> "Union[str, bytes]": """Encode data to JSON using orjson. Args: data: Data to encode. as_bytes: If True, return bytes; otherwise return UTF-8 string. Returns: JSON representation as string or bytes. """ import orjson # type: ignore[import-not-found,unused-ignore] # pyright: ignore[reportMissingImports] orjson_module: Any = orjson options: int = orjson_module.OPT_NAIVE_UTC | orjson_module.OPT_SERIALIZE_UUID if NUMPY_INSTALLED: options |= orjson_module.OPT_SERIALIZE_NUMPY result: bytes = orjson_module.dumps(self._prepare_data_for_encode(data), default=self._enc_hook, option=options) return result if as_bytes else result.decode("utf-8") def decode(self, data: "Union[str, bytes]", *, decode_bytes: bool = True) -> Any: """Decode JSON using orjson. Args: data: JSON string or bytes to decode. decode_bytes: If True, decode bytes input; otherwise return as-is. Returns: Decoded Python object. """ import orjson # type: ignore[import-not-found,unused-ignore] # pyright: ignore[reportMissingImports] orjson_module: Any = orjson if isinstance(data, bytes) and not decode_bytes: return data return orjson_module.loads(data) class StandardLibSerializer(BaseJSONSerializer): """JSON serializer using Python's standard library. Fallback serializer when neither msgspec nor orjson is available. Slower than the alternatives but always available. Example: >>> serializer = StandardLibSerializer() >>> serializer.encode({"key": "value"}) '{"key": "value"}' """ __slots__ = ("_enc_hook",) def __init__(self, type_encoders: "Optional[TypeEncodersMap]" = None) -> None: """Initialize standard library serializer. Args: type_encoders: Custom type encoders to merge with defaults. """ super().__init__(type_encoders) self._enc_hook = self._create_enc_hook() @overload def encode(self, data: Any, *, as_bytes: Literal[False] = ...) -> str: ... @overload def encode(self, data: Any, *, as_bytes: Literal[True]) -> bytes: ... @overload def encode(self, data: Any, *, as_bytes: bool) -> "Union[str, bytes]": ... def encode(self, data: Any, *, as_bytes: bool = False) -> "Union[str, bytes]": """Encode data to JSON using standard library. Args: data: Data to encode. as_bytes: If True, return bytes; otherwise return string. Returns: JSON representation as string or bytes. """ result = json.dumps(self._prepare_data_for_encode(data), default=self._enc_hook) return result.encode("utf-8") if as_bytes else result def decode(self, data: "Union[str, bytes]", *, decode_bytes: bool = True) -> Any: """Decode JSON using standard library. Args: data: JSON string or bytes to decode. decode_bytes: If True, decode bytes input; otherwise return as-is. Returns: Decoded Python object. """ if isinstance(data, bytes): if not decode_bytes: return data data = data.decode("utf-8") return json.loads(data) # ============================================================================ # Serializer factory with caching # ============================================================================ _serializer_cache: "dict[frozenset[tuple[type, int]], JSONSerializer]" = {} _cache_lock: Final[threading.RLock] = threading.RLock() _default_serializer: "Optional[JSONSerializer]" = None def _create_default_serializer() -> "JSONSerializer": """Create the default serializer based on available libraries. Priority: msgspec > orjson > standard library Returns: Best available JSON serializer. """ if MSGSPEC_INSTALLED: return MsgspecSerializer() if ORJSON_INSTALLED: return OrjsonSerializer() return StandardLibSerializer() def _create_serializer(type_encoders: "TypeEncodersMap") -> "JSONSerializer": """Create a serializer with custom type encoders. Args: type_encoders: Custom type encoders mapping. Returns: JSON serializer configured with the given encoders. """ if MSGSPEC_INSTALLED: return MsgspecSerializer(type_encoders=type_encoders) if ORJSON_INSTALLED: return OrjsonSerializer(type_encoders=type_encoders) return StandardLibSerializer(type_encoders=type_encoders) def get_serializer(type_encoders: "Optional[TypeEncodersMap]" = None) -> "JSONSerializer": """Get a cached serializer instance. This factory function returns cached serializer instances to avoid recreating encoders for the same configuration. The default serializer (no custom type_encoders) is a singleton for maximum performance. Args: type_encoders: Optional mapping of types to encoder functions. If None, returns the default singleton serializer. Returns: A JSONSerializer instance configured with the given type encoders. Example: >>> # Default serializer (singleton) >>> s1 = get_serializer() >>> s2 = get_serializer() >>> s1 is s2 True >>> # Custom type encoders (cached by configuration) >>> encoders = {str: lambda s: s.upper()} >>> s3 = get_serializer(encoders) >>> s4 = get_serializer(encoders) >>> s3 is s4 True """ global _default_serializer # noqa: PLW0603 if type_encoders is None: with _cache_lock: if _default_serializer is None: _default_serializer = _create_default_serializer() return _default_serializer # Create hashable cache key using type and function id # Note: Different lambda objects create different keys even if equivalent try: cache_key = frozenset((k, id(v)) for k, v in type_encoders.items()) except TypeError: # Fallback for unhashable keys - create without caching return _create_serializer(type_encoders) with _cache_lock: if cache_key not in _serializer_cache: _serializer_cache[cache_key] = _create_serializer(type_encoders) return _serializer_cache[cache_key] # ============================================================================ # Public API functions # ============================================================================ @overload def encode_json(data: Any) -> str: ... @overload def encode_json(data: Any, *, as_bytes: Literal[False]) -> str: ... @overload def encode_json(data: Any, *, as_bytes: Literal[True]) -> bytes: ... @overload def encode_json(data: Any, *, type_encoders: "TypeEncodersMap") -> str: ... @overload def encode_json(data: Any, *, type_encoders: TypeEncodersMap, as_bytes: Literal[False]) -> str: ... @overload def encode_json(data: Any, *, type_encoders: TypeEncodersMap, as_bytes: Literal[True]) -> bytes: ... def encode_json( data: Any, *, type_encoders: "Optional[TypeEncodersMap]" = None, as_bytes: bool = False, ) -> "Union[str, bytes]": """Encode data to JSON with optional custom type encoders. This is the primary interface for JSON encoding in Advanced Alchemy. It supports custom type encoders that can be passed per-call. Args: data: Data to encode to JSON. type_encoders: Optional mapping of types to encoder functions. These are merged with DEFAULT_TYPE_ENCODERS, with user encoders taking precedence. as_bytes: If True, return bytes; otherwise return str. Returns: JSON representation of data as string or bytes. Example: >>> import datetime >>> encode_json({"date": datetime.date(2025, 12, 17)}) '{"date":"2025-12-17"}' >>> # Custom type encoder >>> class Point: ... def __init__(self, x, y): ... self.x, self.y = x, y >>> encode_json( ... {"point": Point(1, 2)}, ... type_encoders={Point: lambda p: [p.x, p.y]}, ... ) '{"point":[1,2]}' >>> # Return bytes >>> encode_json({"key": "value"}, as_bytes=True) b'{"key":"value"}' """ serializer = get_serializer(type_encoders) return serializer.encode(data, as_bytes=as_bytes) def decode_json(data: "Union[str, bytes]", *, decode_bytes: bool = True) -> Any: """Decode JSON string or bytes to Python object. Args: data: JSON string or bytes to decode. decode_bytes: If True, decode bytes input. If False, bytes are returned as-is without decoding. Returns: Decoded Python object. Example: >>> decode_json('{"key": "value"}') {'key': 'value'} >>> decode_json(b'{"key": "value"}') {'key': 'value'} """ serializer = get_serializer() return serializer.decode(data, decode_bytes=decode_bytes) def encode_complex_type(obj: Any) -> Any: # noqa: PLR0911 """Convert an object to a JSON-serializable format if possible. Handles types that are not natively JSON serializable: - datetime, date, time: ISO format strings - timedelta: total seconds as float - Decimal: string representation - bytes: hex string - UUID: string representation - set, frozenset: list Args: obj: The object to encode. Returns: A JSON-serializable representation of the object, or None if the type is not supported. """ if isinstance(obj, datetime.datetime): return {"__type__": "datetime", "value": obj.isoformat()} if isinstance(obj, datetime.date): return {"__type__": "date", "value": obj.isoformat()} if isinstance(obj, datetime.time): return {"__type__": "time", "value": obj.isoformat()} if isinstance(obj, datetime.timedelta): return {"__type__": "timedelta", "value": obj.total_seconds()} if isinstance(obj, Decimal): return {"__type__": "decimal", "value": str(obj)} if isinstance(obj, bytes): return {"__type__": "bytes", "value": obj.hex()} if isinstance(obj, UUID): return {"__type__": "uuid", "value": str(obj)} if isinstance(obj, (set, frozenset)): items: list[Any] = list(cast("Union[set[Any], frozenset[Any]]", obj)) # type: ignore[redundant-cast] return {"__type__": "set", "value": items} return None def decode_complex_type(value: Any) -> Any: """Recursively decode special type markers. Decodes the special `{"__type__": ..., "value": ...}` structures. """ if isinstance(value, list): value_list = cast("list[Any]", value) # type: ignore[redundant-cast] return [decode_complex_type(v) for v in value_list] if not isinstance(value, dict): return value # Decode any nested values first value_dict = cast("dict[Any, Any]", value) # type: ignore[redundant-cast] decoded: dict[str, Any] = {str(k): decode_complex_type(v) for k, v in value_dict.items()} # Then decode "typed" marker dicts if "__type__" in decoded and "value" in decoded: return _decode_typed_marker(decoded) return decoded def _decode_typed_marker(obj: dict[str, Any]) -> Any: # noqa: PLR0911 """Custom JSON decoder for special types. Args: obj: The dictionary to decode. Returns: The decoded object, or the original dict if not a special type. """ type_name = obj["__type__"] value = obj["value"] if type_name == "datetime": return datetime.datetime.fromisoformat(value) if type_name == "date": return datetime.date.fromisoformat(value) if type_name == "time": return datetime.time.fromisoformat(value) if type_name == "timedelta": return datetime.timedelta(seconds=value) if type_name == "decimal": return Decimal(value) if type_name == "bytes": return bytes.fromhex(value) if type_name == "uuid": return UUID(value) if type_name == "set": return set(value) return obj