Source code for faust.types.stores

import abc
import typing
from typing import Any, Callable, Iterable, Mapping, Optional, Set, TypeVar, Union

from mode import ServiceT
from mode.utils.collections import FastUserDict
from yarl import URL

from .codecs import CodecArg
from .events import EventT
from .tuples import TP

if typing.TYPE_CHECKING:
    from .app import AppT as _AppT
    from .models import ModelArg as _ModelArg
    from .tables import CollectionT as _CollectionT
else:

    class _AppT: ...  # noqa

    class _ModelArg: ...  # noqa

    class _CollectionT: ...  # noqa


__all__ = ["StoreT"]

KT = TypeVar("KT")
VT = TypeVar("VT")


[docs]class StoreT(ServiceT, FastUserDict[KT, VT]): url: URL app: _AppT table: _CollectionT table_name: str key_type: Optional[_ModelArg] value_type: Optional[_ModelArg] key_serializer: CodecArg value_serializer: CodecArg options: Optional[Mapping[str, Any]] @abc.abstractmethod def __init__( self, url: Union[str, URL], app: _AppT, table: _CollectionT, *, table_name: str = "", key_type: _ModelArg = None, value_type: _ModelArg = None, key_serializer: CodecArg = "", value_serializer: CodecArg = "", options: Optional[Mapping[str, Any]] = None, **kwargs: Any, ) -> None: ...
[docs] @abc.abstractmethod def persisted_offset(self, tp: TP) -> Optional[int]: ...
[docs] @abc.abstractmethod def set_persisted_offset(self, tp: TP, offset: int) -> None: ...
[docs] @abc.abstractmethod async def need_active_standby_for(self, tp: TP) -> bool: ...
[docs] @abc.abstractmethod def apply_changelog_batch(
self, batch: Iterable[EventT], to_key: Callable[[Any], KT], to_value: Callable[[Any], VT], ) -> None: ...
[docs] @abc.abstractmethod def reset_state(self) -> None: ...
[docs] @abc.abstractmethod async def on_rebalance(
self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0, ) -> None: ...
[docs] @abc.abstractmethod async def on_recovery_completed(
self, active_tps: Set[TP], standby_tps: Set[TP] ) -> None: ...
[docs] @abc.abstractmethod async def backup_partition(
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1 ) -> None: ...
[docs] @abc.abstractmethod def restore_backup(
self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0, ) -> None: ...