Source code for faust.web.cache.backends.base

"""Cache backend - base implementation."""

import abc
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, ClassVar, Optional, Tuple, Type, Union

from mode import Service
from mode.utils.logging import get_logger
from yarl import URL

from faust.types import AppT
from faust.types.web import CacheBackendT
from faust.web.cache.exceptions import CacheUnavailable

logger = get_logger(__name__)

E_CACHE_IRRECOVERABLE = "Cache disabled for irrecoverable error: %r"
E_CACHE_INVALIDATING = "Destroying cache for key %r caused error: %r"
E_CANNOT_INVALIDATE = "Unable to invalidate key %r: %r"
E_CACHE_INOPERATIONAL = "Cache operational error: %r"


[docs]class CacheBackend(CacheBackendT, Service): """Backend for cache operations.""" logger = logger Unavailable: Type[BaseException] = CacheUnavailable operational_errors: ClassVar[Tuple[Type[BaseException], ...]] = () invalidating_errors: ClassVar[Tuple[Type[BaseException], ...]] = () irrecoverable_errors: ClassVar[Tuple[Type[BaseException], ...]] = () def __init__( self, app: AppT, url: Union[URL, str] = "memory://", **kwargs: Any ) -> None: self.app = app self.url = URL(url) Service.__init__(self, **kwargs) @abc.abstractmethod async def _get(self, key: str) -> Optional[bytes]: ... @abc.abstractmethod async def _set( self, key: str, value: bytes, timeout: Optional[float] = None ) -> None: ... @abc.abstractmethod async def _delete(self, key: str) -> None: ...
[docs] async def get(self, key: str) -> Optional[bytes]: """Get cached-value by key.""" async with self._recovery_context(key): return await self._get(key)
[docs] async def set( self, key: str, value: bytes, timeout: Optional[float] = None ) -> None: """Set cached-value by key.""" async with self._recovery_context(key): await self._set(key, value, timeout)
[docs] async def delete(self, key: str) -> None: """Forget value for cache key.""" async with self._recovery_context(key): await self._delete(key)
@asynccontextmanager async def _recovery_context(self, key: str) -> AsyncGenerator: try: yield except self.irrecoverable_errors as exc: self.log.exception(E_CACHE_IRRECOVERABLE, exc) # noqa: G200 raise self.Unavailable(exc) except self.invalidating_errors as exc: self.log.warning(E_CACHE_INVALIDATING, key, exc, exc_info=1) # noqa: G200 try: await self._delete(key) except ( # noqa: B030 self.operational_errors + self.invalidating_errors ) as exc: self.log.exception(E_CANNOT_INVALIDATE, key, exc) raise self.Unavailable() except self.operational_errors as exc: self.log.warning(E_CACHE_INOPERATIONAL, exc, exc_info=1) # noqa: G200 raise self.Unavailable() def _repr_info(self) -> str: return f"url={self.url!r}"