faust.tables.wrappers

Wrappers for windowed tables.

class faust.tables.wrappers.WindowSet(key: KT, table: TableT, wrapper: WindowWrapperT, event: Optional[EventT] = None)[source]

Represents the windows available for table key.

Table[k] returns WindowSet since k can exist in multiple windows, and to retrieve an actual item we need a timestamp.

The timestamp of the current event (if this is executing in a stream processor), can be used by accessing .current():

Table[k].current()

similarly the most recent value can be accessed using .now():

Table[k].now()

from delta of the time of the current event:

Table[k].delta(timedelta(hours=3))

or delta from time of other event:

Table[k].delta(timedelta(hours=3), other_event)
apply(op: Callable[[VT, VT], VT], value: VT, event: Optional[EventT] = None) WindowSetT[KT, VT][source]

Apply operation to all affected windows.

Return type:

WindowSetT

value(event: Optional[EventT] = None) VT[source]

Return current value.

The selected window depends on the current time-relativity setting used (relative_to_now(), relative_to_stream(), relative_to_field(), etc.)

Return type:

~VT

now() VT[source]

Return current value, using the current system time.

Return type:

~VT

current(event: Optional[EventT] = None) VT[source]

Return current value, using stream time-relativity.

Return type:

~VT

delta(d: Union[timedelta, float, str], event: Optional[EventT] = None) VT[source]

Return value as it was ±n seconds ago.

Return type:

~VT

keys() a set-like object providing a view on D's keys[source]
Return type:

_SpecialForm

items() a set-like object providing a view on D's items[source]
Return type:

_SpecialForm

values() an object providing a view on D's values[source]
Return type:

_SpecialForm

class faust.tables.wrappers.WindowWrapper(table: TableT, *, relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]] = None, key_index: bool = False, key_index_table: Optional[TableT] = None)[source]

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

table: TableT
key_index: bool = False
key_index_table: Optional[TableT] = None
clone(relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT[source]

Clone this table using a new time-relativity configuration.

Return type:

WindowWrapperT

property name: str

Return the name of this table. :rtype: str

relative_to(ts: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT[source]

Configure the time-relativity of this windowed table.

Return type:

WindowWrapperT

relative_to_now() WindowWrapperT[source]

Configure table to be time-relative to the system clock.

Return type:

WindowWrapperT

relative_to_field(field: FieldDescriptorT) WindowWrapperT[source]

Configure table to be time-relative to a field in the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Further it will not use the timestamp of the Kafka message, but a field in the value of the event.

For example a model field:

class Account(faust.Record):
    created: float

table = app.Table('foo').hopping(
    ...,
).relative_to_field(Account.created)
Return type:

WindowWrapperT

relative_to_stream() WindowWrapperT[source]

Configure table to be time-relative to the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Return type:

WindowWrapperT

get_timestamp(event: Optional[EventT] = None) float[source]

Get timestamp from event.

Return type:

float

on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]][source]

Call after table recovery.

Return type:

_CallableGenericAlias[_GenericAlias[None]]

on_set_key(key: Any, value: Any) None[source]

Call when the value for a key in this table is set.

Return type:

None

on_del_key(key: Any) None[source]

Call when a key is deleted from this table.

Return type:

None

keys() KeysView[source]

Return table keys view: iterate over keys found in this table.

Return type:

_SpecialGenericAlias

values(event: Optional[EventT] = None) ValuesView[source]

Return table values view: iterate over values in this table.

Return type:

_SpecialGenericAlias

items(event: Optional[EventT] = None) ItemsView[source]

Return table items view: iterate over (key, value) pairs.

Return type:

_SpecialGenericAlias

as_ansitable(title: str = '{table.name}', **kwargs: Any) str[source]

Draw table as a terminal ANSI table.

Return type:

str

property get_relative_timestamp: Optional[Callable[[Optional[EventT]], Union[float, datetime]]]

Return the current handler for extracting event timestamp. :rtype: _UnionGenericAlias[_CallableGenericAlias[_UnionGenericAlias[EventT, None], _UnionGenericAlias[float, datetime]], None]

class faust.tables.wrappers.WindowedItemsView(mapping: WindowWrapperT, event: Optional[EventT] = None)[source]

The object returned by windowed_table.items().

now() Iterator[Tuple[Any, Any]][source]

Return all items present in window closest to system time.

Return type:

_GenericAlias[_GenericAlias[Any, Any]]

current(event: Optional[EventT] = None) Iterator[Tuple[Any, Any]][source]

Return all items present in window closest to stream time.

Return type:

_GenericAlias[_GenericAlias[Any, Any]]

delta(d: Union[timedelta, float, str], event: Optional[EventT] = None) Iterator[Tuple[Any, Any]][source]

Return all items present in window ±n seconds ago.

Return type:

_GenericAlias[_GenericAlias[Any, Any]]

class faust.tables.wrappers.WindowedKeysView(mapping: WindowWrapperT, event: Optional[EventT] = None)[source]

The object returned by windowed_table.keys().

now() Iterator[Any][source]

Return all keys present in window closest to system time.

Return type:

_GenericAlias[Any]

current(event: Optional[EventT] = None) Iterator[Any][source]

Return all keys present in window closest to stream time.

Return type:

_GenericAlias[Any]

delta(d: Union[timedelta, float, str], event: Optional[EventT] = None) Iterator[Any][source]

Return all keys present in window ±n seconds ago.

Return type:

_GenericAlias[Any]

class faust.tables.wrappers.WindowedValuesView(mapping: WindowWrapperT, event: Optional[EventT] = None)[source]

The object returned by windowed_table.values().

now() Iterator[Any][source]

Return all values present in window closest to system time.

Return type:

_GenericAlias[Any]

current(event: Optional[EventT] = None) Iterator[Any][source]

Return all values present in window closest to stream time.

Return type:

_GenericAlias[Any]

delta(d: Union[timedelta, float, str], event: Optional[EventT] = None) Iterator[Any][source]

Return all values present in window ±n seconds ago.

Return type:

_GenericAlias[Any]