"""Base class for table storage drivers."""
import abc
from collections.abc import ItemsView, KeysView, ValuesView
from typing import (
Any,
Callable,
Iterable,
Iterator,
Mapping,
Optional,
Set,
Tuple,
Union,
cast,
)
from mode import Service
from yarl import URL
from faust.types import TP, AppT, CodecArg, CollectionT, EventT, ModelArg, StoreT
from faust.types.stores import KT, VT
__all__ = ["Store", "SerializedStore"]
[docs]class Store(StoreT[KT, VT], Service):
"""Base class for table storage drivers."""
def __init__(
self,
url: Union[str, URL],
app: AppT,
table: CollectionT,
*,
table_name: str = "",
key_type: ModelArg = None,
value_type: ModelArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
options: Optional[Mapping[str, Any]] = None,
**kwargs: Any,
) -> None:
Service.__init__(self, **kwargs)
self.url = URL(url)
self.app = app
self.table = table
self.table_name = table_name or self.table.name
self.key_type = key_type
self.value_type = value_type
self.key_serializer = key_serializer
self.value_serializer = value_serializer
self.options = options
def __hash__(self) -> int:
return object.__hash__(self)
[docs] def persisted_offset(self, tp: TP) -> Optional[int]:
"""Return the persisted offset for this topic and partition."""
raise NotImplementedError("In-memory store only, does not persist.")
[docs] def set_persisted_offset(self, tp: TP, offset: int) -> None:
"""Set the persisted offset for this topic and partition."""
...
[docs] async def need_active_standby_for(self, tp: TP) -> bool:
"""Return :const:`True` if we have a copy of standby from elsewhere."""
return True
[docs] async def on_rebalance(
self,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP],
generation_id: int = 0,
) -> None:
"""Handle rebalancing of the cluster."""
...
[docs] async def on_recovery_completed(
self, active_tps: Set[TP], standby_tps: Set[TP]
) -> None:
"""Signal that table recovery completed."""
...
def _encode_key(self, key: KT) -> bytes:
key_bytes = self.app.serializers.dumps_key(
self.key_type, key, serializer=self.key_serializer
)
if key_bytes is None:
raise TypeError("Table key cannot be None")
return key_bytes
def _encode_value(self, value: VT) -> Optional[bytes]:
return self.app.serializers.dumps_value(
self.value_type, value, serializer=self.value_serializer
)
def _decode_key(self, key: Optional[bytes]) -> KT:
return cast(
KT,
self.app.serializers.loads_key(
self.key_type, key, serializer=self.key_serializer
),
)
def _decode_value(self, value: Optional[bytes]) -> VT:
return self.app.serializers.loads_value(
self.value_type, value, serializer=self.value_serializer
)
def _repr_info(self) -> str:
return f"table_name={self.table_name} url={self.url}"
@property
def label(self) -> str:
"""Return short description of this store."""
return f"{type(self).__name__}: {self.url}"
class _SerializedStoreKeysView(KeysView):
def __init__(self, store: "SerializedStore") -> None:
self._mapping = store
def __iter__(self) -> Iterator:
yield from self._mapping._keys_decoded()
class _SerializedStoreValuesView(ValuesView):
def __init__(self, store: "SerializedStore") -> None:
self._mapping = store
def __iter__(self) -> Iterator:
yield from self._mapping._values_decoded()
class _SerializedStoreItemsView(ItemsView):
def __init__(self, store: "SerializedStore") -> None:
self._mapping = store
def __iter__(self) -> Iterator[Tuple[Any, Any]]:
yield from self._mapping._items_decoded()
[docs]class SerializedStore(Store[KT, VT]):
"""Base class for table storage drivers requiring serialization."""
@abc.abstractmethod
def _get(self, key: bytes) -> Optional[bytes]: # pragma: no cover
...
@abc.abstractmethod
def _set(self, key: bytes, value: Optional[bytes]) -> None: # pragma: no cover
...
@abc.abstractmethod
def _del(self, key: bytes) -> None: # pragma: no cover
...
@abc.abstractmethod
def _iterkeys(self) -> Iterator[bytes]: # pragma: no cover
...
@abc.abstractmethod
def _itervalues(self) -> Iterator[bytes]: # pragma: no cover
...
@abc.abstractmethod
def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]: # pragma: no cover
...
@abc.abstractmethod
def _size(self) -> int: # pragma: no cover
...
@abc.abstractmethod
def _contains(self, key: bytes) -> bool: # pragma: no cover
...
@abc.abstractmethod
def _clear(self) -> None: # pragma: no cover
...
[docs] def apply_changelog_batch(
self,
batch: Iterable[EventT],
to_key: Callable[[Any], KT],
to_value: Callable[[Any], VT],
) -> None:
"""Apply batch of events from changelog topic to this store."""
for event in batch:
key = event.message.key
if key is None:
raise TypeError(f"Changelog entry is missing key: {event.message}")
value = event.message.value
if value is None:
self._del(key)
else:
# keys/values are already JSON serialized in the message
self._set(key, value)
def __getitem__(self, key: KT) -> VT:
value = self._get(self._encode_key(key))
if value is None:
raise KeyError(key)
return self._decode_value(value)
def __setitem__(self, key: KT, value: VT) -> None:
return self._set(self._encode_key(key), self._encode_value(value))
def __delitem__(self, key: KT) -> None:
return self._del(self._encode_key(key))
def __iter__(self) -> Iterator[KT]:
yield from self._keys_decoded()
def __len__(self) -> int:
return self._size()
def __contains__(self, key: object) -> bool:
return self._contains(self._encode_key(cast(KT, key)))
[docs] def keys(self) -> KeysView:
"""Return view of keys in the K/V store."""
return _SerializedStoreKeysView(self)
def _keys_decoded(self) -> Iterator[KT]:
for key in self._iterkeys():
yield self._decode_key(key)
[docs] def values(self) -> ValuesView:
"""Return view of values in the K/V store."""
return _SerializedStoreValuesView(self)
def _values_decoded(self) -> Iterator[VT]:
for value in self._itervalues():
yield self._decode_value(value)
[docs] def items(self) -> ItemsView:
"""Return view of items in the K/V store as (key, value) pairs."""
return _SerializedStoreItemsView(self)
def _items_decoded(self) -> Iterator[Tuple[KT, VT]]:
for key, value in self._iteritems():
yield self._decode_key(key), self._decode_value(value)
[docs] def clear(self) -> None:
"""Clear all data from this K/V store."""
self._clear()