import abc
import typing
from datetime import datetime
from typing import (
Any,
Awaitable,
Callable,
ItemsView,
Iterable,
Iterator,
KeysView,
Mapping,
MutableMapping,
Optional,
Set,
Tuple,
TypeVar,
Union,
ValuesView,
)
from mode import Seconds, ServiceT
from mode.utils.collections import FastUserDict, ManagedUserDict
from yarl import URL
from .codecs import CodecArg
from .events import EventT
from .stores import StoreT
from .streams import JoinableT
from .topics import TopicT
from .tuples import TP, FutureMessage
from .windows import WindowT
if typing.TYPE_CHECKING:
from .app import AppT as _AppT
from .models import FieldDescriptorT as _FieldDescriptorT, ModelArg as _ModelArg
from .serializers import SchemaT as _SchemaT
else:
class _AppT: ... # noqa
class _FieldDescriptorT: ... # noqa
class _ModelArg: ... # noqa
class _SchemaT: ... # noqa
__all__ = [
"RecoverCallback",
"RelativeArg",
"CollectionT",
"TableT",
"GlobalTableT",
"TableManagerT",
"WindowCloseCallback",
"WindowSetT",
"WindowedItemsViewT",
"WindowedValuesViewT",
"WindowWrapperT",
"ChangelogEventCallback",
"CollectionTps",
]
RelativeHandler = Callable[[Optional[EventT]], Union[float, datetime]]
RecoverCallback = Callable[[], Awaitable[None]]
ChangelogEventCallback = Callable[[EventT], Awaitable[None]]
WindowCloseCallback = Callable[[Any, Any], Union[None, Awaitable[None]]]
RelativeArg = Optional[
Union[
_FieldDescriptorT,
RelativeHandler,
datetime,
float,
]
]
CollectionTps = MutableMapping["CollectionT", Set[TP]]
KT = TypeVar("KT")
VT = TypeVar("VT")
[docs]class CollectionT(ServiceT, JoinableT):
app: _AppT
name: str
default: Any # noqa: E704
schema: Optional[_SchemaT]
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
partitions: Optional[int]
window: Optional[WindowT]
help: str
recovery_buffer_size: int
standby_buffer_size: int
options: Optional[Mapping[str, Any]]
last_closed_window: float
use_partitioner: bool
is_global: bool = False
@abc.abstractmethod
def __init__(
self,
app: _AppT,
*,
name: Optional[str] = None,
default: Callable[[], Any] = None,
store: Union[str, URL] = None,
schema: Optional[_SchemaT] = None,
key_type: _ModelArg = None,
value_type: _ModelArg = None,
partitions: Optional[int] = None,
window: Optional[WindowT] = None,
changelog_topic: Optional[TopicT] = None,
help: Optional[str] = None,
on_recover: RecoverCallback = None,
on_changelog_event: Optional[ChangelogEventCallback] = None,
recovery_buffer_size: int = 1000,
standby_buffer_size: Optional[int] = None,
extra_topic_configs: Optional[Mapping[str, Any]] = None,
options: Optional[Mapping[str, Any]] = None,
use_partitioner: bool = False,
on_window_close: Optional[WindowCloseCallback] = None,
**kwargs: Any,
) -> None: ...
[docs] @abc.abstractmethod
def clone(self, **kwargs: Any) -> Any: ...
@property
@abc.abstractmethod
def changelog_topic(self) -> TopicT: ...
@changelog_topic.setter
def changelog_topic(self, topic: TopicT) -> None: ...
@abc.abstractmethod
def _changelog_topic_name(self) -> str: ...
[docs] @abc.abstractmethod
def apply_changelog_batch(self, batch: Iterable[EventT]) -> None: ...
[docs] @abc.abstractmethod
def persisted_offset(self, tp: TP) -> Optional[int]: ...
[docs] @abc.abstractmethod
async def need_active_standby_for(self, tp: TP) -> bool: ...
[docs] @abc.abstractmethod
def reset_state(self) -> None: ...
[docs] @abc.abstractmethod
def send_changelog(
self,
partition: Optional[int],
key: Any,
value: Any,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
) -> FutureMessage: ...
[docs] @abc.abstractmethod
def partition_for_key(self, key: Any) -> Optional[int]: ...
[docs] @abc.abstractmethod
async def on_window_close(self, key: Any, value: Any) -> None: ...
[docs] @abc.abstractmethod
async def on_rebalance(
self,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP],
generation_id: int = 0,
) -> None: ...
[docs] @abc.abstractmethod
async def on_changelog_event(self, event: EventT) -> None: ...
[docs] @abc.abstractmethod
def on_recover(self, fun: RecoverCallback) -> RecoverCallback: ...
[docs] @abc.abstractmethod
async def on_recovery_completed(
self, active_tps: Set[TP], standby_tps: Set[TP]
) -> None: ...
[docs] @abc.abstractmethod
async def call_recover_callbacks(self) -> None: ...
[docs] @abc.abstractmethod
def using_window(
self, window: WindowT, *, key_index: bool = False
) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod
def hopping(
self,
size: Seconds,
step: Seconds,
expires: Optional[Seconds] = None,
key_index: bool = False,
) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod
def tumbling(
self, size: Seconds, expires: Optional[Seconds] = None, key_index: bool = False
) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod
def as_ansitable(self, **kwargs: Any) -> str: ...
@abc.abstractmethod
def _relative_now(self, event: Optional[EventT] = None) -> float: ...
@abc.abstractmethod
def _relative_event(self, event: Optional[EventT] = None) -> float: ...
@abc.abstractmethod
def _relative_field(self, field: _FieldDescriptorT) -> RelativeHandler: ...
@abc.abstractmethod
def _relative_timestamp(self, timestamp: float) -> RelativeHandler: ...
@abc.abstractmethod
def _windowed_contains(self, key: Any, timestamp: float) -> bool: ...
[docs]class TableT(CollectionT, ManagedUserDict[KT, VT]): ...
[docs]class GlobalTableT(TableT): ...
[docs]class TableManagerT(ServiceT, FastUserDict[str, CollectionT]):
app: _AppT
actives_ready: bool
standbys_ready: bool
@abc.abstractmethod
def __init__(self, app: _AppT, **kwargs: Any) -> None: ...
[docs] @abc.abstractmethod
def add(self, table: CollectionT) -> CollectionT: ...
[docs] @abc.abstractmethod
def persist_offset_on_commit(self, store: StoreT, tp: TP, offset: int) -> None: ...
[docs] @abc.abstractmethod
def on_commit(self, offsets: MutableMapping[TP, int]) -> None: ...
[docs] @abc.abstractmethod
async def on_rebalance(
self,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP],
generation_id: int = 0,
) -> None: ...
[docs] @abc.abstractmethod
def on_partitions_revoked(self, revoked: Set[TP]) -> None: ...
[docs] @abc.abstractmethod
def on_rebalance_start(self) -> None: ...
[docs] @abc.abstractmethod
async def wait_until_tables_registered(self) -> None: ...
[docs] @abc.abstractmethod
async def wait_until_recovery_completed(self) -> bool: ...
@property
@abc.abstractmethod
def changelog_topics(self) -> Set[str]: ...
[docs]class WindowSetT(FastUserDict[KT, VT]):
key: Any
table: TableT
event: Optional[EventT]
@abc.abstractmethod
def __init__(
self,
key: KT,
table: TableT,
wrapper: "WindowWrapperT",
event: Optional[EventT] = None,
) -> None: ...
[docs] @abc.abstractmethod
def apply(
self, op: Callable[[VT, VT], VT], value: VT, event: Optional[EventT] = None
) -> "WindowSetT": ...
[docs] @abc.abstractmethod
def value(self, event: Optional[EventT] = None) -> VT: ...
[docs] @abc.abstractmethod
def current(self, event: Optional[EventT] = None) -> VT: ...
[docs] @abc.abstractmethod
def now(self) -> VT: ...
[docs] @abc.abstractmethod
def delta(self, d: Seconds, event: Optional[EventT] = None) -> VT: ...
@abc.abstractmethod
def __iadd__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __isub__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __imul__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __itruediv__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __ifloordiv__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __imod__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __ipow__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __ilshift__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __irshift__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __iand__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __ixor__(self, other: VT) -> "WindowSetT": ...
@abc.abstractmethod
def __ior__(self, other: VT) -> "WindowSetT": ...
[docs]class WindowedItemsViewT(ItemsView):
@abc.abstractmethod
def __init__(
self, mapping: "WindowWrapperT", event: Optional[EventT] = None
) -> None: ...
@abc.abstractmethod
def __iter__(self) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod
def now(self) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod
def current(self, event: Optional[EventT] = None) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod
def delta(
self, d: Seconds, event: Optional[EventT] = None
) -> Iterator[Tuple[Any, Any]]: ...
[docs]class WindowedValuesViewT(ValuesView):
@abc.abstractmethod
def __init__(
self, mapping: "WindowWrapperT", event: Optional[EventT] = None
) -> None: ...
@abc.abstractmethod
def __iter__(self) -> Iterator[Any]: ...
[docs] @abc.abstractmethod
def now(self) -> Iterator[Any]: ...
[docs] @abc.abstractmethod
def current(self, event: Optional[EventT] = None) -> Iterator[Any]: ...
[docs] @abc.abstractmethod
def delta(self, d: Seconds, event: Optional[EventT] = None) -> Iterator[Any]: ...
[docs]class WindowWrapperT(MutableMapping):
table: TableT
@abc.abstractmethod
def __init__(
self,
table: TableT,
*,
relative_to: RelativeArg = None,
key_index: bool = False,
key_index_table: Optional[TableT] = None,
) -> None: ...
@property
@abc.abstractmethod
def name(self) -> str: ...
[docs] @abc.abstractmethod
def clone(self, relative_to: RelativeArg) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod
def relative_to_now(self) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod
def relative_to_field(self, field: _FieldDescriptorT) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod
def relative_to_stream(self) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod
def get_timestamp(self, event: Optional[EventT] = None) -> float: ...
@abc.abstractmethod
def __getitem__(self, key: Any) -> WindowSetT: ...
[docs] @abc.abstractmethod
def keys(self) -> KeysView: ...
[docs] @abc.abstractmethod
def on_set_key(self, key: Any, value: Any) -> None: ...
[docs] @abc.abstractmethod
def on_del_key(self, key: Any) -> None: ...
[docs] @abc.abstractmethod
def as_ansitable(self, **kwargs: Any) -> str: ...
@property
def get_relative_timestamp(self) -> Optional[RelativeHandler]: ...
@get_relative_timestamp.setter
def get_relative_timestamp(self, relative_to: RelativeArg) -> None: ...