"""Table (key/value changelog stream)."""
from typing import Any, ClassVar, Optional, Type
from mode import Seconds
from faust import windows
from faust.types.tables import KT, VT, TableT, WindowWrapperT
from faust.types.windows import WindowT
from faust.utils.terminal.tables import dict_as_ansitable
from . import wrappers
from .base import Collection
__all__ = ["Table"]
[docs]class Table(TableT[KT, VT], Collection):
"""Table (non-windowed)."""
WindowWrapper: ClassVar[Type[WindowWrapperT]] = wrappers.WindowWrapper
[docs] def using_window(
self, window: WindowT, *, key_index: bool = False
) -> WindowWrapperT:
"""Wrap table using a specific window type."""
self.window = window
self._changelog_compacting = True
self._changelog_deleting = True
self._changelog_topic = None # will reset on next property access
return self.WindowWrapper(self, key_index=key_index)
[docs] def hopping(
self,
size: Seconds,
step: Seconds,
expires: Optional[Seconds] = None,
key_index: bool = False,
) -> WindowWrapperT:
"""Wrap table in a hopping window."""
return self.using_window(
windows.HoppingWindow(size, step, expires),
key_index=key_index,
)
[docs] def tumbling(
self, size: Seconds, expires: Optional[Seconds] = None, key_index: bool = False
) -> WindowWrapperT:
"""Wrap table in a tumbling window."""
return self.using_window(
windows.TumblingWindow(size, expires),
key_index=key_index,
)
def __missing__(self, key: KT) -> VT:
if self.default is not None:
return self.default()
raise KeyError(key)
def _has_key(self, key: KT) -> bool:
return key in self
def _get_key(self, key: KT) -> VT:
return self[key]
def _set_key(self, key: KT, value: VT) -> None:
self[key] = value
def _del_key(self, key: KT) -> None:
del self[key]
[docs] def on_key_get(self, key: KT) -> None:
"""Call when the value for a key in this table is retrieved."""
self._sensor_on_get(self, key)
[docs] def on_key_set(self, key: KT, value: VT) -> None:
"""Call when the value for a key in this table is set."""
fut = self.send_changelog(self.partition_for_key(key), key, value)
# partition may be None, in which case the finalized partition
# is in fut.partition
partition = fut.message.partition
assert partition is not None
self._maybe_set_key_ttl(key, partition)
self._sensor_on_set(self, key, value)
[docs] def on_key_del(self, key: KT) -> None:
"""Call when a key in this table is removed."""
fut = self.send_changelog(
self.partition_for_key(key), key, value=None, value_serializer="raw"
)
partition = fut.message.partition
assert partition is not None
self._maybe_del_key_ttl(key, partition)
self._sensor_on_del(self, key)
[docs] def on_clear(self) -> None:
"""Call when the table is cleared."""
for key in self:
self.on_key_del(key)
[docs] def as_ansitable(self, title: str = "{table.name}", **kwargs: Any) -> str:
"""Draw table as a terminal ANSI table."""
return dict_as_ansitable(self, title=title.format(table=self), **kwargs)