Source code for advanced_alchemy.types.file_object.registry

from importlib.util import find_spec
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, overload

from advanced_alchemy._serialization import decode_json, encode_json
from advanced_alchemy.exceptions import ImproperConfigurationError
from advanced_alchemy.utils.module_loader import import_string
from advanced_alchemy.utils.singleton import SingletonMeta

if TYPE_CHECKING:
    from advanced_alchemy.types.file_object.base import StorageBackend

DEFAULT_BACKEND = (
    "advanced_alchemy.types.file_object.backends.obstore.ObstoreBackend"
    if find_spec("obstore")
    else "advanced_alchemy.types.file_object.backends.fsspec.FSSpecBackend"
)


[docs] class StorageRegistry(metaclass=SingletonMeta): """A provider for creating and managing threaded portals."""
[docs] def __init__( self, json_serializer: "Callable[[Any], str]" = encode_json, json_deserializer: Callable[[Union[str, bytes]], Any] = decode_json, default_backend: "Union[str, type[StorageBackend]]" = DEFAULT_BACKEND, ) -> None: """Initialize the PortalProvider.""" self._registry: dict[str, StorageBackend] = {} self.json_serializer = json_serializer self.json_deserializer = json_deserializer self.default_backend: str = ( DEFAULT_BACKEND if isinstance(default_backend, str) else default_backend.__qualname__ )
[docs] def set_default_backend(self, default_backend: "Union[str, type[StorageBackend]]") -> None: """Set the default storage backend. Args: default_backend: The default storage backend """ self.default_backend = default_backend if isinstance(default_backend, str) else default_backend.__qualname__
[docs] def is_registered(self, key: str) -> bool: """Check if a storage backend is registered in the registry. Args: key: The key of the storage backend Returns: bool: True if the storage backend is registered, False otherwise. """ return key in self._registry
[docs] def get_backend(self, key: str) -> "StorageBackend": """Retrieve a configured storage backend from the registry. Returns: StorageBackend: The storage backend associaStorageBackendiven key. Raises: ImproperConfigurationError: If no storage backend is registered with the given key. """ try: return self._registry[key] except KeyError as e: msg = f'No storage backend registered with key "{key}"' raise ImproperConfigurationError(msg) from e
@overload def register_backend(self, value: "str") -> None: ... @overload def register_backend(self, value: "str", key: None = None) -> None: ... @overload def register_backend(self, value: "str", key: str) -> None: ... @overload def register_backend(self, value: "StorageBackend", key: None = None) -> None: ... @overload def register_backend(self, value: "StorageBackend", key: str) -> None: ...
[docs] def register_backend(self, value: "Union[StorageBackend, str]", key: "Optional[str]" = None) -> None: """Register a new storage backend in the registry. Args: value: The storage backend to register. key: The key to register the storage backend with. Raises: ImproperConfigurationError: If a string value is provided without a key. """ if isinstance(value, str): if key is None: msg = "key is required when registering a string value" raise ImproperConfigurationError(msg) self._registry[key] = import_string(self.default_backend)(fs=value, key=key) else: if key is not None: msg = "key is not allowed when registering a StorageBackend" raise ImproperConfigurationError(msg) self._registry[value.key] = value
[docs] def unregister_backend(self, key: str) -> None: """Unregister a storage backend from the registry.""" if key in self._registry: del self._registry[key]
[docs] def clear_backends(self) -> None: """Clear the registry.""" self._registry.clear()
[docs] def registered_backends(self) -> list[str]: """Return a list of all registered keys.""" return list(self._registry.keys())
storages = StorageRegistry()