faust.tables.manager

Tables (changelog stream).

class faust.tables.manager.TableManager(app: AppT, **kwargs: Any)[source]

Manage tables used by Faust worker.

app: _AppT
data: MutableMapping
actives_ready: bool
standbys_ready: bool
persist_offset_on_commit(store: StoreT, tp: TP, offset: int) None[source]

Mark the persisted offset for a TP to be saved on commit.

This is used for “exactly_once” processing guarantee. Instead of writing the persisted offset to RocksDB when the message is sent, we write it to disk when the offset is committed.

Return type:

None

on_commit(offsets: MutableMapping[TP, int]) None[source]

Call when committing source topic partitions.

Return type:

None

on_commit_tp(tp: TP) None[source]

Call when committing source topic partition used by this table.

Return type:

None

on_rebalance_start() None[source]

Call when a new rebalancing operation starts.

Return type:

None

on_actives_ready() None[source]

Call when actives are fully up-to-date.

Return type:

None

on_standbys_ready() None[source]

Call when standbys are fully up-to-date and ready for failover.

Return type:

None

property changelog_topics: Set[str]

Return the set of known changelog topics. :rtype: _GenericAlias[str]

property changelog_queue: ThrowableQueue

Queue used to buffer changelog events. :rtype: ThrowableQueue

property recovery: Recovery

Recovery service used by this table manager. :rtype: Recovery

add(table: CollectionT) CollectionT[source]

Add table to be managed by this table manager.

Return type:

CollectionT

logger: logging.Logger = <Logger faust.tables.manager (WARNING)>
async on_start() None[source]

Call when table manager is starting.

Return type:

None

log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
async wait_until_tables_registered() None[source]
Return type:

None

async on_stop() None[source]

Call when table manager is stopping.

Return type:

None

on_partitions_revoked(revoked: Set[TP]) None[source]

Call when cluster is rebalancing and partitions revoked.

Return type:

None

async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]

Call when the cluster is rebalancing.

Return type:

None

async wait_until_recovery_completed() bool[source]
Return type:

bool