Source code for faust.tables.objects

"""Storing objects in tables.

This is also used to store data structures such as sets/lists.

"""

import abc
from typing import (
    Any,
    Callable,
    ClassVar,
    Dict,
    Iterable,
    MutableMapping,
    Optional,
    Set,
    Type,
)

from mode import Service

from faust.stores.base import Store
from faust.streams import current_event
from faust.types import TP, EventT
from faust.types.stores import StoreT
from faust.types.tables import CollectionT

from .table import Table


[docs]class ChangeloggedObject: """A changelogged object in a :class:`ChangeloggedObjectManager` store.""" manager: "ChangeloggedObjectManager" def __init__(self, manager: "ChangeloggedObjectManager", key: Any) -> None: self.manager = manager self.key = key self.__post_init__() def __post_init__(self) -> None: # pragma: no cover ...
[docs] @abc.abstractmethod def sync_from_storage(self, value: Any) -> None: """Sync value from storage.""" ...
[docs] @abc.abstractmethod def as_stored_value(self) -> Any: """Return value as represented in storage.""" ...
[docs] @abc.abstractmethod def apply_changelog_event(self, operation: int, value: Any) -> None: """Apply event in changelog topic to local table state.""" ...
[docs]class ChangeloggedObjectManager(Store): """Store of changelogged objects.""" ValueType: ClassVar[Type[ChangeloggedObject]] table: Table data: MutableMapping _storage: Optional[StoreT] = None def __init__(self, table: Table, **kwargs: Any) -> None: self.table = table self.table_name = self.table.name self.data = {} Service.__init__(self, loop=table.loop, **kwargs)
[docs] def send_changelog_event(self, key: Any, operation: int, value: Any) -> None: """Send changelog event to the tables changelog topic.""" event = current_event() self.table._send_changelog(event, (operation, key), value) self.storage[key] = self[key].as_stored_value()
def __getitem__(self, key: Any) -> ChangeloggedObject: if key in self.data: return self.data[key] s = self.data[key] = self.ValueType(self, key) return s def __setitem__(self, key: Any, value: Any) -> None: raise NotImplementedError(f"{self._table_type_name}: cannot set key") def __delitem__(self, key: Any) -> None: raise NotImplementedError(f"{self._table_type_name}: cannot del key") @property def _table_type_name(self) -> str: return f"{type(self.table).__name__}"
[docs] async def on_start(self) -> None: """Call when the changelogged object manager starts.""" await self.add_runtime_dependency(self.storage)
[docs] def persisted_offset(self, tp: TP) -> Optional[int]: """Get the last persisted offset for changelog topic partition.""" return self.storage.persisted_offset(tp)
[docs] def set_persisted_offset(self, tp: TP, offset: int) -> None: """Set the last persisted offset for changelog topic partition.""" self.storage.set_persisted_offset(tp, offset)
[docs] async def on_rebalance( self, table: CollectionT, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], ) -> None: """Call when cluster is rebalancing.""" await self.storage.on_rebalance(table, assigned, revoked, newly_assigned)
[docs] async def on_recovery_completed( self, active_tps: Set[TP], standby_tps: Set[TP] ) -> None: """Call when table recovery is completed after rebalancing.""" self.sync_from_storage()
[docs] def sync_from_storage(self) -> None: """Sync set contents from storage.""" for key, value in self.storage.items(): self[key].sync_from_storage(value)
[docs] def reset_state(self) -> None: """Reset table local state.""" # delegate to underlying RocksDB store. self.storage.reset_state()
@property def storage(self) -> StoreT: """Return underlying storage used by this set table.""" if self._storage is None: self._storage = self.table._new_store_by_url( self.table._store or self.table.app.conf.store ) return self._storage
[docs] def apply_changelog_batch( self, batch: Iterable[EventT], to_key: Callable[[Any], Any], to_value: Callable[[Any], Any], ) -> None: """Apply batch of changelog events to local state.""" tp_offsets: Dict[TP, int] = {} for event in batch: tp, offset = event.message.tp, event.message.offset tp_offsets[tp] = ( offset if tp not in tp_offsets else max(offset, tp_offsets[tp]) ) if event.key is None: raise RuntimeError("Changelog key cannot be None") operation, key = event.key key = to_key(key) value: Any = to_value(event.value) self[key].apply_changelog_event(operation, value) for tp, offset in tp_offsets.items(): self.set_persisted_offset(tp, offset)
[docs] async def backup_partition( self, tp, flush: bool = True, purge: bool = False, keep: int = 1 ) -> None: raise NotImplementedError
[docs] def restore_backup( self, tp, latest: bool = True, backup_id: int = 0, ) -> None: raise NotImplementedError