Source code for faust.web.cache.backends.base
"""Cache backend - base implementation."""
import abc
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, ClassVar, Optional, Tuple, Type, Union
from mode import Service
from mode.utils.logging import get_logger
from yarl import URL
from faust.types import AppT
from faust.types.web import CacheBackendT
from faust.web.cache.exceptions import CacheUnavailable
logger = get_logger(__name__)
E_CACHE_IRRECOVERABLE = "Cache disabled for irrecoverable error: %r"
E_CACHE_INVALIDATING = "Destroying cache for key %r caused error: %r"
E_CANNOT_INVALIDATE = "Unable to invalidate key %r: %r"
E_CACHE_INOPERATIONAL = "Cache operational error: %r"
[docs]class CacheBackend(CacheBackendT, Service):
"""Backend for cache operations."""
logger = logger
Unavailable: Type[BaseException] = CacheUnavailable
operational_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()
invalidating_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()
irrecoverable_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()
def __init__(
self, app: AppT, url: Union[URL, str] = "memory://", **kwargs: Any
) -> None:
self.app = app
self.url = URL(url)
Service.__init__(self, **kwargs)
@abc.abstractmethod
async def _get(self, key: str) -> Optional[bytes]: ...
@abc.abstractmethod
async def _set(
self, key: str, value: bytes, timeout: Optional[float] = None
) -> None: ...
@abc.abstractmethod
async def _delete(self, key: str) -> None: ...
[docs] async def get(self, key: str) -> Optional[bytes]:
"""Get cached-value by key."""
async with self._recovery_context(key):
return await self._get(key)
[docs] async def set(
self, key: str, value: bytes, timeout: Optional[float] = None
) -> None:
"""Set cached-value by key."""
async with self._recovery_context(key):
await self._set(key, value, timeout)
[docs] async def delete(self, key: str) -> None:
"""Forget value for cache key."""
async with self._recovery_context(key):
await self._delete(key)
@asynccontextmanager
async def _recovery_context(self, key: str) -> AsyncGenerator:
try:
yield
except self.irrecoverable_errors as exc:
self.log.exception(E_CACHE_IRRECOVERABLE, exc) # noqa: G200
raise self.Unavailable(exc)
except self.invalidating_errors as exc:
self.log.warning(E_CACHE_INVALIDATING, key, exc, exc_info=1) # noqa: G200
try:
await self._delete(key)
except ( # noqa: B030
self.operational_errors + self.invalidating_errors
) as exc:
self.log.exception(E_CANNOT_INVALIDATE, key, exc)
raise self.Unavailable()
except self.operational_errors as exc:
self.log.warning(E_CACHE_INOPERATIONAL, exc, exc_info=1) # noqa: G200
raise self.Unavailable()
def _repr_info(self) -> str:
return f"url={self.url!r}"