Source code for faust.types.tables

import abc
import typing
from datetime import datetime
from typing import (
    Any,
    Awaitable,
    Callable,
    ItemsView,
    Iterable,
    Iterator,
    KeysView,
    Mapping,
    MutableMapping,
    Optional,
    Set,
    Tuple,
    TypeVar,
    Union,
    ValuesView,
)

from mode import Seconds, ServiceT
from mode.utils.collections import FastUserDict, ManagedUserDict
from yarl import URL

from .codecs import CodecArg
from .events import EventT
from .stores import StoreT
from .streams import JoinableT
from .topics import TopicT
from .tuples import TP, FutureMessage
from .windows import WindowT

if typing.TYPE_CHECKING:
    from .app import AppT as _AppT
    from .models import FieldDescriptorT as _FieldDescriptorT, ModelArg as _ModelArg
    from .serializers import SchemaT as _SchemaT
else:

    class _AppT: ...  # noqa

    class _FieldDescriptorT: ...  # noqa

    class _ModelArg: ...  # noqa

    class _SchemaT: ...  # noqa


__all__ = [
    "RecoverCallback",
    "RelativeArg",
    "CollectionT",
    "TableT",
    "GlobalTableT",
    "TableManagerT",
    "WindowCloseCallback",
    "WindowSetT",
    "WindowedItemsViewT",
    "WindowedValuesViewT",
    "WindowWrapperT",
    "ChangelogEventCallback",
    "CollectionTps",
]

RelativeHandler = Callable[[Optional[EventT]], Union[float, datetime]]
RecoverCallback = Callable[[], Awaitable[None]]
ChangelogEventCallback = Callable[[EventT], Awaitable[None]]
WindowCloseCallback = Callable[[Any, Any], Union[None, Awaitable[None]]]
RelativeArg = Optional[
    Union[
        _FieldDescriptorT,
        RelativeHandler,
        datetime,
        float,
    ]
]
CollectionTps = MutableMapping["CollectionT", Set[TP]]

KT = TypeVar("KT")
VT = TypeVar("VT")


[docs]class CollectionT(ServiceT, JoinableT): app: _AppT name: str default: Any # noqa: E704 schema: Optional[_SchemaT] key_type: Optional[_ModelArg] value_type: Optional[_ModelArg] partitions: Optional[int] window: Optional[WindowT] help: str recovery_buffer_size: int standby_buffer_size: int options: Optional[Mapping[str, Any]] last_closed_window: float use_partitioner: bool is_global: bool = False @abc.abstractmethod def __init__( self, app: _AppT, *, name: Optional[str] = None, default: Callable[[], Any] = None, store: Union[str, URL] = None, schema: Optional[_SchemaT] = None, key_type: _ModelArg = None, value_type: _ModelArg = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: RecoverCallback = None, on_changelog_event: Optional[ChangelogEventCallback] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[WindowCloseCallback] = None, **kwargs: Any, ) -> None: ...
[docs] @abc.abstractmethod def clone(self, **kwargs: Any) -> Any: ...
@property @abc.abstractmethod def changelog_topic(self) -> TopicT: ... @changelog_topic.setter def changelog_topic(self, topic: TopicT) -> None: ... @abc.abstractmethod def _changelog_topic_name(self) -> str: ...
[docs] @abc.abstractmethod def apply_changelog_batch(self, batch: Iterable[EventT]) -> None: ...
[docs] @abc.abstractmethod def persisted_offset(self, tp: TP) -> Optional[int]: ...
[docs] @abc.abstractmethod async def need_active_standby_for(self, tp: TP) -> bool: ...
[docs] @abc.abstractmethod def reset_state(self) -> None: ...
[docs] @abc.abstractmethod def send_changelog(
self, partition: Optional[int], key: Any, value: Any, key_serializer: CodecArg = None, value_serializer: CodecArg = None, ) -> FutureMessage: ...
[docs] @abc.abstractmethod def partition_for_key(self, key: Any) -> Optional[int]: ...
[docs] @abc.abstractmethod async def on_window_close(self, key: Any, value: Any) -> None: ...
[docs] @abc.abstractmethod async def on_rebalance(
self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0, ) -> None: ...
[docs] @abc.abstractmethod async def on_changelog_event(self, event: EventT) -> None: ...
[docs] @abc.abstractmethod def on_recover(self, fun: RecoverCallback) -> RecoverCallback: ...
[docs] @abc.abstractmethod async def on_recovery_completed(
self, active_tps: Set[TP], standby_tps: Set[TP] ) -> None: ...
[docs] @abc.abstractmethod async def call_recover_callbacks(self) -> None: ...
[docs] @abc.abstractmethod def using_window(
self, window: WindowT, *, key_index: bool = False ) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod def hopping(
self, size: Seconds, step: Seconds, expires: Optional[Seconds] = None, key_index: bool = False, ) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod def tumbling(
self, size: Seconds, expires: Optional[Seconds] = None, key_index: bool = False ) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod def as_ansitable(self, **kwargs: Any) -> str: ...
@abc.abstractmethod def _relative_now(self, event: Optional[EventT] = None) -> float: ... @abc.abstractmethod def _relative_event(self, event: Optional[EventT] = None) -> float: ... @abc.abstractmethod def _relative_field(self, field: _FieldDescriptorT) -> RelativeHandler: ... @abc.abstractmethod def _relative_timestamp(self, timestamp: float) -> RelativeHandler: ... @abc.abstractmethod def _windowed_contains(self, key: Any, timestamp: float) -> bool: ...
[docs]class TableT(CollectionT, ManagedUserDict[KT, VT]): ...
[docs]class GlobalTableT(TableT): ...
[docs]class TableManagerT(ServiceT, FastUserDict[str, CollectionT]): app: _AppT actives_ready: bool standbys_ready: bool @abc.abstractmethod def __init__(self, app: _AppT, **kwargs: Any) -> None: ...
[docs] @abc.abstractmethod def add(self, table: CollectionT) -> CollectionT: ...
[docs] @abc.abstractmethod def persist_offset_on_commit(self, store: StoreT, tp: TP, offset: int) -> None: ...
[docs] @abc.abstractmethod def on_commit(self, offsets: MutableMapping[TP, int]) -> None: ...
[docs] @abc.abstractmethod async def on_rebalance(
self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0, ) -> None: ...
[docs] @abc.abstractmethod def on_partitions_revoked(self, revoked: Set[TP]) -> None: ...
[docs] @abc.abstractmethod def on_rebalance_start(self) -> None: ...
[docs] @abc.abstractmethod async def wait_until_tables_registered(self) -> None: ...
[docs] @abc.abstractmethod async def wait_until_recovery_completed(self) -> bool: ...
@property @abc.abstractmethod def changelog_topics(self) -> Set[str]: ...
[docs]class WindowSetT(FastUserDict[KT, VT]): key: Any table: TableT event: Optional[EventT] @abc.abstractmethod def __init__( self, key: KT, table: TableT, wrapper: "WindowWrapperT", event: Optional[EventT] = None, ) -> None: ...
[docs] @abc.abstractmethod def apply(
self, op: Callable[[VT, VT], VT], value: VT, event: Optional[EventT] = None ) -> "WindowSetT": ...
[docs] @abc.abstractmethod def value(self, event: Optional[EventT] = None) -> VT: ...
[docs] @abc.abstractmethod def current(self, event: Optional[EventT] = None) -> VT: ...
[docs] @abc.abstractmethod def now(self) -> VT: ...
[docs] @abc.abstractmethod def delta(self, d: Seconds, event: Optional[EventT] = None) -> VT: ...
@abc.abstractmethod def __iadd__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __isub__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __imul__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __itruediv__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __ifloordiv__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __imod__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __ipow__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __ilshift__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __irshift__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __iand__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __ixor__(self, other: VT) -> "WindowSetT": ... @abc.abstractmethod def __ior__(self, other: VT) -> "WindowSetT": ...
[docs]class WindowedItemsViewT(ItemsView): @abc.abstractmethod def __init__( self, mapping: "WindowWrapperT", event: Optional[EventT] = None ) -> None: ... @abc.abstractmethod def __iter__(self) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod def now(self) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod def current(self, event: Optional[EventT] = None) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod def delta(
self, d: Seconds, event: Optional[EventT] = None ) -> Iterator[Tuple[Any, Any]]: ...
[docs]class WindowedValuesViewT(ValuesView): @abc.abstractmethod def __init__( self, mapping: "WindowWrapperT", event: Optional[EventT] = None ) -> None: ... @abc.abstractmethod def __iter__(self) -> Iterator[Any]: ...
[docs] @abc.abstractmethod def now(self) -> Iterator[Any]: ...
[docs] @abc.abstractmethod def current(self, event: Optional[EventT] = None) -> Iterator[Any]: ...
[docs] @abc.abstractmethod def delta(self, d: Seconds, event: Optional[EventT] = None) -> Iterator[Any]: ...
[docs]class WindowWrapperT(MutableMapping): table: TableT @abc.abstractmethod def __init__( self, table: TableT, *, relative_to: RelativeArg = None, key_index: bool = False, key_index_table: Optional[TableT] = None, ) -> None: ... @property @abc.abstractmethod def name(self) -> str: ...
[docs] @abc.abstractmethod def clone(self, relative_to: RelativeArg) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod def relative_to_now(self) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod def relative_to_field(self, field: _FieldDescriptorT) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod def relative_to_stream(self) -> "WindowWrapperT": ...
[docs] @abc.abstractmethod def get_timestamp(self, event: Optional[EventT] = None) -> float: ...
@abc.abstractmethod def __getitem__(self, key: Any) -> WindowSetT: ...
[docs] @abc.abstractmethod def keys(self) -> KeysView: ...
[docs] @abc.abstractmethod def on_set_key(self, key: Any, value: Any) -> None: ...
[docs] @abc.abstractmethod def on_del_key(self, key: Any) -> None: ...
[docs] @abc.abstractmethod def as_ansitable(self, **kwargs: Any) -> str: ...
@property def get_relative_timestamp(self) -> Optional[RelativeHandler]: ... @get_relative_timestamp.setter def get_relative_timestamp(self, relative_to: RelativeArg) -> None: ...