Source code for faust.stores.memory

"""In-memory table storage."""

from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple, Union

from faust.types import TP, EventT
from faust.types.stores import KT, VT

from . import base


[docs]class Store(base.Store, base.StoreT[KT, VT]): """Table storage using an in-memory dictionary.""" def __post_init__(self) -> None: self.data: MutableMapping = {} self._key_partition: MutableMapping[KT, int] = {} def _clear(self) -> None: self.data.clear()
[docs] def apply_changelog_batch( self, batch: Iterable[EventT], to_key: Callable[[Any], Any], to_value: Callable[[Any], Any], ) -> None: """Apply batch of changelog events to in-memory table.""" # default store does not do serialization, so we need # to convert these raw json serialized keys to proper structures # (E.g. regenerate tuples in WindowedKeys etc). to_delete: Set[Any] = set() delete_key = self.data.pop self.data.update( self._create_batch_iterator(to_delete.add, to_key, to_value, batch) ) for key in to_delete: # If the key was assigned a value again, it will not be deleted. if not self.data[key]: delete_key(key, None) self._key_partition.pop(key, None)
def _create_batch_iterator( self, mark_as_delete: Callable[[Any], None], to_key: Callable[[Any], Any], to_value: Callable[[Any], Any], batch: Iterable[EventT], ) -> Iterable[Tuple[Any, Any]]: for event in batch: key = to_key(event.key) # to delete keys in the table we set the raw value to None if event.message.value is None: mark_as_delete(key) self._key_partition[key] = event.message.partition yield key, to_value(event.value)
[docs] async def on_recovery_completed( self, active_tps: Set[TP], standby_tps: Set[TP] ) -> None: partitions = {partition for _, partition in active_tps} delete_keys = [] for k, p in self._key_partition.items(): if p not in partitions: delete_keys.append(k) for k in delete_keys: self.data.pop(k, None) self._key_partition.pop(k, None)
[docs] def persisted_offset(self, tp: TP) -> Optional[int]: """Return the persisted offset. This always returns :const:`None` when using the in-memory store. """ return None
[docs] def reset_state(self) -> None: """Remove local file system state. This does nothing when using the in-memory store. """ ...
[docs] async def backup_partition( self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1 ) -> None: """Backup partition from this store. This does nothing when using the in-memory store. """ ...
[docs] def restore_backup( self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0 ) -> None: """Restore partition backup from this store. This does nothing when using the in-memory store. """ ...