faust.tables.table

Table (key/value changelog stream).

class faust.tables.table.Table(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, **kwargs: Any)[source]

Table (non-windowed).

class WindowWrapper(table: TableT, *, relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]] = None, key_index: bool = False, key_index_table: Optional[TableT] = None)

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

as_ansitable(title: str = '{table.name}', **kwargs: Any) str

Draw table as a terminal ANSI table.

Return type:

str

clone(relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT

Clone this table using a new time-relativity configuration.

Return type:

WindowWrapperT

property get_relative_timestamp: Optional[Callable[[Optional[EventT]], Union[float, datetime]]]

Return the current handler for extracting event timestamp. :rtype: _UnionGenericAlias[_CallableGenericAlias[_UnionGenericAlias[EventT, None], _UnionGenericAlias[float, datetime]], None]

get_timestamp(event: Optional[EventT] = None) float

Get timestamp from event.

Return type:

float

items(event: Optional[EventT] = None) ItemsView

Return table items view: iterate over (key, value) pairs.

Return type:

_SpecialGenericAlias

key_index: bool = False
key_index_table: Optional[TableT] = None
keys() KeysView

Return table keys view: iterate over keys found in this table.

Return type:

_SpecialGenericAlias

property name: str

Return the name of this table. :rtype: str

on_del_key(key: Any) None

Call when a key is deleted from this table.

Return type:

None

on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]]

Call after table recovery.

Return type:

_CallableGenericAlias[_GenericAlias[None]]

on_set_key(key: Any, value: Any) None

Call when the value for a key in this table is set.

Return type:

None

relative_to(ts: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT

Configure the time-relativity of this windowed table.

Return type:

WindowWrapperT

relative_to_field(field: FieldDescriptorT) WindowWrapperT

Configure table to be time-relative to a field in the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Further it will not use the timestamp of the Kafka message, but a field in the value of the event.

For example a model field:

class Account(faust.Record):
    created: float

table = app.Table('foo').hopping(
    ...,
).relative_to_field(Account.created)
Return type:

WindowWrapperT

relative_to_now() WindowWrapperT

Configure table to be time-relative to the system clock.

Return type:

WindowWrapperT

relative_to_stream() WindowWrapperT

Configure table to be time-relative to the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Return type:

WindowWrapperT

values(event: Optional[EventT] = None) ValuesView

Return table values view: iterate over values in this table.

Return type:

_SpecialGenericAlias

table: TableT
using_window(window: WindowT, *, key_index: bool = False) WindowWrapperT[source]

Wrap table using a specific window type.

Return type:

WindowWrapperT

hopping(size: Union[timedelta, float, str], step: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT[source]

Wrap table in a hopping window.

Return type:

WindowWrapperT

tumbling(size: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT[source]

Wrap table in a tumbling window.

Return type:

WindowWrapperT

on_key_get(key: KT) None[source]

Call when the value for a key in this table is retrieved.

Return type:

None

on_key_set(key: KT, value: VT) None[source]

Call when the value for a key in this table is set.

Return type:

None

on_key_del(key: KT) None[source]

Call when a key in this table is removed.

Return type:

None

on_clear() None[source]

Call when the table is cleared.

Return type:

None

as_ansitable(title: str = '{table.name}', **kwargs: Any) str[source]

Draw table as a terminal ANSI table.

Return type:

str

logger: logging.Logger = <Logger faust.tables.table (WARNING)>
log: CompositeLogger
app: _AppT
name: str
default: Any
schema: Optional[_SchemaT]
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
partitions: Optional[int]
window: Optional[WindowT]
help: str
recovery_buffer_size: int
standby_buffer_size: int
options: Optional[Mapping[str, Any]]
last_closed_window: float
use_partitioner: bool
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack