import abc
import typing
from typing import Any, Callable, Iterable, Mapping, Optional, Set, TypeVar, Union
from mode import ServiceT
from mode.utils.collections import FastUserDict
from yarl import URL
from .codecs import CodecArg
from .events import EventT
from .tuples import TP
if typing.TYPE_CHECKING:
from .app import AppT as _AppT
from .models import ModelArg as _ModelArg
from .tables import CollectionT as _CollectionT
else:
class _AppT: ... # noqa
class _ModelArg: ... # noqa
class _CollectionT: ... # noqa
__all__ = ["StoreT"]
KT = TypeVar("KT")
VT = TypeVar("VT")
[docs]class StoreT(ServiceT, FastUserDict[KT, VT]):
url: URL
app: _AppT
table: _CollectionT
table_name: str
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
key_serializer: CodecArg
value_serializer: CodecArg
options: Optional[Mapping[str, Any]]
@abc.abstractmethod
def __init__(
self,
url: Union[str, URL],
app: _AppT,
table: _CollectionT,
*,
table_name: str = "",
key_type: _ModelArg = None,
value_type: _ModelArg = None,
key_serializer: CodecArg = "",
value_serializer: CodecArg = "",
options: Optional[Mapping[str, Any]] = None,
**kwargs: Any,
) -> None: ...
[docs] @abc.abstractmethod
def persisted_offset(self, tp: TP) -> Optional[int]: ...
[docs] @abc.abstractmethod
def set_persisted_offset(self, tp: TP, offset: int) -> None: ...
[docs] @abc.abstractmethod
async def need_active_standby_for(self, tp: TP) -> bool: ...
[docs] @abc.abstractmethod
def apply_changelog_batch(
self,
batch: Iterable[EventT],
to_key: Callable[[Any], KT],
to_value: Callable[[Any], VT],
) -> None: ...
[docs] @abc.abstractmethod
def reset_state(self) -> None: ...
[docs] @abc.abstractmethod
async def on_rebalance(
self,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP],
generation_id: int = 0,
) -> None: ...
[docs] @abc.abstractmethod
async def on_recovery_completed(
self, active_tps: Set[TP], standby_tps: Set[TP]
) -> None: ...
[docs] @abc.abstractmethod
async def backup_partition(
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
) -> None: ...
[docs] @abc.abstractmethod
def restore_backup(
self,
tp: Union[TP, int],
latest: bool = True,
backup_id: int = 0,
) -> None: ...