faust.tables
¶
Tables: Distributed object K/V-store.
- class faust.tables.Collection(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, **kwargs: Any)[source]¶
Base class for changelog-backed data structures stored in Kafka.
- on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]] [source]¶
Add function as callback to be called on table recovery.
- Return type:
_CallableGenericAlias
[_GenericAlias
[None
]]
- persisted_offset(tp: TP) Optional[int] [source]¶
Return the last persisted offset for topic partition.
- Return type:
_UnionGenericAlias
[int
,None
]
- async need_active_standby_for(tp: TP) bool [source]¶
Return
False
if we have access to partition data.- Return type:
- send_changelog(partition: Optional[int], key: Any, value: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None) FutureMessage [source]¶
Send modification event to changelog topic.
- Return type:
- partition_for_key(key: Any) Optional[int] [source]¶
Return partition number for table key.
Always returns
None
whenuse_partitioner
is enabled.
- join(*fields: FieldDescriptorT) StreamT [source]¶
Right join of this table and another stream/table.
- Return type:
- left_join(*fields: FieldDescriptorT) StreamT [source]¶
Left join of this table and another stream/table.
- Return type:
- inner_join(*fields: FieldDescriptorT) StreamT [source]¶
Inner join of this table and another stream/table.
- Return type:
- outer_join(*fields: FieldDescriptorT) StreamT [source]¶
Outer join of this table and another stream/table.
- Return type:
- combine(*nodes: JoinableT, **kwargs: Any) StreamT [source]¶
Combine tables and streams.
- Return type:
- contribute_to_stream(active: StreamT) None [source]¶
Contribute table to stream join.
- Return type:
None
- async remove_from_stream(stream: StreamT) None [source]¶
Remove table from stream join after stream stopped.
- Return type:
None
- async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None [source]¶
Call when cluster is rebalancing.
- Return type:
None
- async on_recovery_completed(active_tps: Set[TP], standby_tps: Set[TP]) None [source]¶
Call when recovery has completed after rebalancing.
- Return type:
None
- async call_recover_callbacks() None [source]¶
Call any configured recovery callbacks after rebalancing.
- Return type:
None
- async on_changelog_event(event: EventT) None [source]¶
Call when a new changelog event is received.
- Return type:
None
- logger: logging.Logger = <Logger faust.tables.base (WARNING)>¶
- class faust.tables.CollectionT(app: _AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, **kwargs: Any)[source]¶
- app: _AppT¶
- abstract persisted_offset(tp: TP) Optional[int] [source]¶
- Return type:
_UnionGenericAlias
[int
,None
]
- abstract send_changelog(partition: Optional[int], key: Any, value: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None) FutureMessage [source]¶
- Return type:
- abstract partition_for_key(key: Any) Optional[int] [source]¶
- Return type:
_UnionGenericAlias
[int
,None
]
- abstract async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None [source]¶
- Return type:
None
- abstract on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]] [source]¶
- Return type:
_CallableGenericAlias
[_GenericAlias
[None
]]
- abstract async on_recovery_completed(active_tps: Set[TP], standby_tps: Set[TP]) None [source]¶
- Return type:
None
- abstract using_window(window: WindowT, *, key_index: bool = False) WindowWrapperT [source]¶
- Return type:
- abstract hopping(size: Union[timedelta, float, str], step: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT [source]¶
- Return type:
- class faust.tables.GlobalTable(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, **kwargs: Any)[source]¶
Warning
Using a GlobalTable with multiple app instances may cause an app to be stuck in an infinite recovery loop. The current fix for this is to run the table with the following options:
app.GlobalTable(..., partitions=1, recovery_buffer_size=1)
- logger: logging.Logger = <Logger faust.tables.globaltable (WARNING)>¶
- log: CompositeLogger¶
- app: _AppT¶
- default: Any¶
- schema: Optional[_SchemaT]¶
- key_type: Optional[_ModelArg]¶
- value_type: Optional[_ModelArg]¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class faust.tables.GlobalTableT(app: _AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, **kwargs: Any)[source]¶
- class faust.tables.TableManager(app: AppT, **kwargs: Any)[source]¶
Manage tables used by Faust worker.
- persist_offset_on_commit(store: StoreT, tp: TP, offset: int) None [source]¶
Mark the persisted offset for a TP to be saved on commit.
This is used for “exactly_once” processing guarantee. Instead of writing the persisted offset to RocksDB when the message is sent, we write it to disk when the offset is committed.
- Return type:
None
- on_commit(offsets: MutableMapping[TP, int]) None [source]¶
Call when committing source topic partitions.
- Return type:
None
- on_commit_tp(tp: TP) None [source]¶
Call when committing source topic partition used by this table.
- Return type:
None
- on_standbys_ready() None [source]¶
Call when standbys are fully up-to-date and ready for failover.
- Return type:
None
- property changelog_topics: Set[str]¶
Return the set of known changelog topics. :rtype:
_GenericAlias
[str
]
- property changelog_queue: ThrowableQueue¶
Queue used to buffer changelog events. :rtype:
ThrowableQueue
- add(table: CollectionT) CollectionT [source]¶
Add table to be managed by this table manager.
- Return type:
- logger: logging.Logger = <Logger faust.tables.manager (WARNING)>¶
- on_partitions_revoked(revoked: Set[TP]) None [source]¶
Call when cluster is rebalancing and partitions revoked.
- Return type:
None
- class faust.tables.TableManagerT(app: _AppT, **kwargs: Any)[source]¶
- app: _AppT¶
- abstract add(table: CollectionT) CollectionT [source]¶
- Return type:
- abstract persist_offset_on_commit(store: StoreT, tp: TP, offset: int) None [source]¶
- Return type:
None
- class faust.tables.Table(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, **kwargs: Any)[source]¶
Table (non-windowed).
- class WindowWrapper(table: TableT, *, relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]] = None, key_index: bool = False, key_index_table: Optional[TableT] = None)¶
Windowed table wrapper.
A windowed table does not return concrete values when keys are accessed, instead
WindowSet
is returned so that the values can be further reduced to the wanted time period.- as_ansitable(title: str = '{table.name}', **kwargs: Any) str ¶
Draw table as a terminal ANSI table.
- Return type:
- clone(relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT ¶
Clone this table using a new time-relativity configuration.
- Return type:
- property get_relative_timestamp: Optional[Callable[[Optional[EventT]], Union[float, datetime]]]¶
Return the current handler for extracting event timestamp. :rtype:
_UnionGenericAlias
[_CallableGenericAlias
[_UnionGenericAlias
[EventT
,None
],_UnionGenericAlias
[float
,datetime
]],None
]
- items(event: Optional[EventT] = None) ItemsView ¶
Return table items view: iterate over
(key, value)
pairs.- Return type:
_SpecialGenericAlias
- keys() KeysView ¶
Return table keys view: iterate over keys found in this table.
- Return type:
_SpecialGenericAlias
- on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]] ¶
Call after table recovery.
- Return type:
_CallableGenericAlias
[_GenericAlias
[None
]]
- on_set_key(key: Any, value: Any) None ¶
Call when the value for a key in this table is set.
- Return type:
None
- relative_to(ts: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT ¶
Configure the time-relativity of this windowed table.
- Return type:
- relative_to_field(field: FieldDescriptorT) WindowWrapperT ¶
Configure table to be time-relative to a field in the stream.
This means the window will use the timestamp from the event currently being processed in the stream.
Further it will not use the timestamp of the Kafka message, but a field in the value of the event.
For example a model field:
class Account(faust.Record): created: float table = app.Table('foo').hopping( ..., ).relative_to_field(Account.created)
- Return type:
- relative_to_now() WindowWrapperT ¶
Configure table to be time-relative to the system clock.
- Return type:
- relative_to_stream() WindowWrapperT ¶
Configure table to be time-relative to the stream.
This means the window will use the timestamp from the event currently being processed in the stream.
- Return type:
- values(event: Optional[EventT] = None) ValuesView ¶
Return table values view: iterate over values in this table.
- Return type:
_SpecialGenericAlias
- using_window(window: WindowT, *, key_index: bool = False) WindowWrapperT [source]¶
Wrap table using a specific window type.
- Return type:
- hopping(size: Union[timedelta, float, str], step: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT [source]¶
Wrap table in a hopping window.
- Return type:
- tumbling(size: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT [source]¶
Wrap table in a tumbling window.
- Return type:
- on_key_get(key: KT) None [source]¶
Call when the value for a key in this table is retrieved.
- Return type:
None
- on_key_set(key: KT, value: VT) None [source]¶
Call when the value for a key in this table is set.
- Return type:
None
- as_ansitable(title: str = '{table.name}', **kwargs: Any) str [source]¶
Draw table as a terminal ANSI table.
- Return type:
- logger: logging.Logger = <Logger faust.tables.table (WARNING)>¶
- log: CompositeLogger¶
- app: _AppT¶
- default: Any¶
- schema: Optional[_SchemaT]¶
- key_type: Optional[_ModelArg]¶
- value_type: Optional[_ModelArg]¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class faust.tables.TableT(app: _AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, **kwargs: Any)[source]¶