faust.tables.recovery

Table recovery after rebalancing.

class faust.tables.recovery.RecoveryStats(highwater, offset, remaining)[source]
highwater: int

Alias for field number 0

offset: int

Alias for field number 1

remaining: int

Alias for field number 2

exception faust.tables.recovery.ServiceStopped[source]

The recovery service was stopped.

exception faust.tables.recovery.RebalanceAgain[source]

During rebalance, another rebalance happened.

class faust.tables.recovery.Recovery(app: AppT, tables: TableManagerT, **kwargs: Any)[source]

Service responsible for recovering tables from changelog topics.

stats_interval: float = 5.0
highwaters: Counter[TP]

Mapping of highwaters by topic partition.

in_recovery: bool = False
standbys_pending: bool = False
flush_timeout_secs: float = 120.0

Time in seconds after we warn that no flush has happened.

event_timeout_secs: float = 30.0

Time in seconds after we warn that no events have been received.

num_samples_required_for_estimate = 1000

Number of entries in _processing_times before we can give an estimate of time remaining.

app: AppT
tables: _TableManager
standby_tps: Set[TP]

Set of standby topic partitions.

active_tps: Set[TP]

Set of active topic partitions.

tp_to_table: MutableMapping[TP, CollectionT]

Mapping from topic partition to table

active_offsets: Counter[TP]

Active offset by topic partition.

standby_offsets: Counter[TP]

Standby offset by topic partition.

active_highwaters: Counter[TP]

Active highwaters by topic partition.

standby_highwaters: Counter[TP]

Standby highwaters by topic partition.

completed: Event
buffers: MutableMapping[CollectionT, List[EventT]]

Changelog event buffers by table. These are filled by background task _slurp_changelog, and need to be flushed before starting new recovery/stopping.

buffer_sizes: MutableMapping[TP, int]

Cache of max buffer size by topic partition..

recovery_delay: float
actives_for_table: MutableMapping[CollectionT, Set[TP]]
standbys_for_table: MutableMapping[CollectionT, Set[TP]]
property signal_recovery_start: Event

Event used to signal that recovery has started. :rtype: Event

property signal_recovery_end: Event

Event used to signal that recovery has ended. :rtype: Event

async on_stop() None[source]

Call when recovery service stops.

Return type:

None

add_active(table: CollectionT, tp: TP) None[source]

Add changelog partition to be used for active recovery.

Return type:

None

add_standby(table: CollectionT, tp: TP) None[source]

Add changelog partition to be used for standby recovery.

Return type:

None

revoke(tp: TP) None[source]

Revoke assignment of table changelog partition.

Return type:

None

on_partitions_revoked(revoked: Set[TP]) None[source]

Call when rebalancing and partitions are revoked.

Return type:

None

async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]

Call when cluster is rebalancing.

Return type:

None

active_remaining_seconds(remaining: float) str[source]
Return type:

str

async on_recovery_completed(generation_id: int = 0) None[source]

Call when active table recovery is completed.

Return type:

None

logger: logging.Logger = <Logger faust.tables.recovery (WARNING)>
flush_buffers() None[source]

Flush changelog buffers.

Return type:

None

need_recovery() bool[source]

Return True if recovery is required.

Return type:

bool

active_remaining() Counter[TP][source]

Return counter of remaining changes by active partition.

Return type:

_GenericAlias[TP]

standby_remaining() Counter[TP][source]

Return counter of remaining changes by standby partition.

Return type:

_GenericAlias[TP]

active_remaining_total() int[source]

Return number of changes remaining for actives to be up-to-date.

Return type:

int

standby_remaining_total() int[source]

Return number of changes remaining for standbys to be up-to-date.

Return type:

int

active_stats() Mapping[TP, RecoveryStats][source]

Return current active recovery statistics.

Return type:

_GenericAlias[TP, RecoveryStats]

standby_stats() Mapping[TP, RecoveryStats][source]

Return current standby recovery statistics.

Return type:

_GenericAlias[TP, RecoveryStats]