Source code for faust.tables.recovery

"""Table recovery after rebalancing."""

import asyncio
import statistics
import typing
from asyncio import Event
from collections import defaultdict, deque
from time import monotonic
from typing import (

import opentracing
from aiokafka.errors import IllegalStateError
from mode import Service, get_logger
from import WaitArgT
from mode.utils.times import humanize_seconds, humanize_seconds_ago
from yarl import URL

from faust.exceptions import ConsistencyError
from faust.types import TP, AppT, EventT
from faust.types.tables import CollectionT, TableManagerT
from faust.types.transports import ConsumerT
from faust.utils import terminal
from faust.utils.terminal.tables import TableDataT  # List[List[str]]
from faust.utils.tracing import finish_span, traced_from_parent_span

if typing.TYPE_CHECKING:
    from import App as _App

    from .manager import TableManager as _TableManager

    class _App: ...  # noqa

    class _TableManager: ...  # noqa

The persisted offset for changelog topic partition {0} is higher
than the last offset in that topic (highwater) ({1} > {2}).

Most likely you have removed data from the topics without
removing the RocksDB database file for this partition.
logger = get_logger(__name__)

[docs]class RecoveryStats(NamedTuple): highwater: int offset: int remaining: int
RecoveryStatsMapping = Mapping[TP, RecoveryStats]
[docs]class ServiceStopped(Exception): """The recovery service was stopped."""
[docs]class RebalanceAgain(Exception): """During rebalance, another rebalance happened."""
[docs]class Recovery(Service): """Service responsible for recovering tables from changelog topics.""" app: AppT tables: _TableManager stats_interval: float = 5.0 #: Set of standby topic partitions. standby_tps: Set[TP] #: Set of active topic partitions. active_tps: Set[TP] actives_for_table: MutableMapping[CollectionT, Set[TP]] standbys_for_table: MutableMapping[CollectionT, Set[TP]] #: Mapping from topic partition to table tp_to_table: MutableMapping[TP, CollectionT] #: Active offset by topic partition. active_offsets: Counter[TP] #: Standby offset by topic partition. standby_offsets: Counter[TP] #: Mapping of highwaters by topic partition. highwaters: Counter[TP] #: Active highwaters by topic partition. active_highwaters: Counter[TP] #: Standby highwaters by topic partition. standby_highwaters: Counter[TP] _signal_recovery_start: Optional[Event] = None _signal_recovery_end: Optional[Event] = None completed: Event in_recovery: bool = False standbys_pending: bool = False recovery_delay: float #: Changelog event buffers by table. #: These are filled by background task `_slurp_changelog`, #: and need to be flushed before starting new recovery/stopping. buffers: MutableMapping[CollectionT, List[EventT]] #: Cache of max buffer size by topic partition.. buffer_sizes: MutableMapping[TP, int] #: Time in seconds after we warn that no flush has happened. flush_timeout_secs: float = 120.0 #: Time in seconds after we warn that no events have been received. event_timeout_secs: float = 30.0 #: Time of last event received by active TP _active_events_received_at: MutableMapping[TP, float] #: Time of last event received by standby TP _standby_events_received_at: MutableMapping[TP, float] #: Time of last event received (for any active TP) _last_active_event_processed_at: Optional[float] #: Time of last buffer flush _last_flush_at: Optional[float] = None #: Time when recovery last started _recovery_started_at: Optional[float] = None #: Time when recovery last ended _recovery_ended_at: Optional[float] = None _recovery_span: Optional[opentracing.Span] = None _actives_span: Optional[opentracing.Span] = None _standbys_span: Optional[opentracing.Span] = None #: List of last 100 processing timestamps (monotonic). #: Updated after processing every changelog record, #: used to estimate time remaining. _processing_times: Deque[float] #: Number of entries in _processing_times before #: we can give an estimate of time remaining. num_samples_required_for_estimate = 1000 _generation_id: int = 0 def __init__(self, app: AppT, tables: TableManagerT, **kwargs: Any) -> None: = app self.tables = cast(_TableManager, tables) self.standby_tps = set() self.active_tps = set() self.tp_to_table = {} self.active_offsets = Counter() self.standby_offsets = Counter() self.active_highwaters = Counter() self.standby_highwaters = Counter() self.completed = Event() self.buffers = defaultdict(list) self.buffer_sizes = {} self.recovery_delay = self.actives_for_table = defaultdict(set) self.standbys_for_table = defaultdict(set) self._active_events_received_at = {} self._standby_events_received_at = {} self._processing_times = deque() super().__init__(**kwargs) @property def signal_recovery_start(self) -> Event: """Event used to signal that recovery has started.""" if self._signal_recovery_start is None: self._signal_recovery_start = Event() return self._signal_recovery_start @property def signal_recovery_end(self) -> Event: """Event used to signal that recovery has ended.""" if self._signal_recovery_end is None: self._signal_recovery_end = Event() return self._signal_recovery_end
[docs] async def on_stop(self) -> None: """Call when recovery service stops.""" # Flush buffers when stopping. self.flush_buffers()
[docs] def add_active(self, table: CollectionT, tp: TP) -> None: """Add changelog partition to be used for active recovery.""" self.active_tps.add(tp) self.actives_for_table[table].add(tp) self._add(table, tp, self.active_offsets)
[docs] def add_standby(self, table: CollectionT, tp: TP) -> None: """Add changelog partition to be used for standby recovery.""" self.standby_tps.add(tp) self.standbys_for_table[table].add(tp) self._add(table, tp, self.standby_offsets)
def _add(self, table: CollectionT, tp: TP, offsets: Counter[TP]) -> None: self.tp_to_table[tp] = table persisted_offset = table.persisted_offset(tp) if persisted_offset is not None: offsets[tp] = persisted_offset offsets.setdefault(tp, None) # type: ignore
[docs] def revoke(self, tp: TP) -> None: """Revoke assignment of table changelog partition.""" self.standby_offsets.pop(tp, None) self.standby_highwaters.pop(tp, None) self.active_offsets.pop(tp, None) self.active_highwaters.pop(tp, None)
[docs] def on_partitions_revoked(self, revoked: Set[TP]) -> None: """Call when rebalancing and partitions are revoked.""" T = traced_from_parent_span() T(self.flush_buffers)()
[docs] async def on_rebalance( self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0, ) -> None: """Call when cluster is rebalancing.""" # removing all the sleeps so control does not go back to the loop app = f"generation id {generation_id} " f"app consumers id {app.consumer_generation_id}" ) if generation_id != app.consumer_generation_id: logger.warning( f"rebalance again generation id " f"{generation_id} app consumers id {app.consumer_generation_id}" ) return assigned_standbys = app.assignor.assigned_standbys() assigned_actives = app.assignor.assigned_actives() for tp in revoked: self.revoke(tp) self.standby_tps.clear() self.active_tps.clear() self.actives_for_table.clear() self.standbys_for_table.clear() for tp in assigned_standbys: table = self.tables._changelogs.get(tp.topic) if table is not None: self.add_standby(table, tp) for tp in assigned_actives: table = self.tables._changelogs.get(tp.topic) if table is not None: self.add_active(table, tp) active_offsets = { tp: offset for tp, offset in self.active_offsets.items() if tp in self.active_tps } self.active_offsets.clear() self.active_offsets.update(active_offsets) rebalancing_span = cast(_App, if app.tracer and rebalancing_span: self._recovery_span = app.tracer.get_tracer("_faust").start_span( "recovery", child_of=rebalancing_span, ) app._span_add_default_tags(self._recovery_span) self.signal_recovery_start.set() self._generation_id = generation_id
async def _resume_streams(self, generation_id: int = 0) -> None: app = consumer = app.consumer await app.on_rebalance_complete.send() assignment = consumer.assignment() if != generation_id: self.log.warning("Recovery rebalancing again") return if assignment:"Resume stream partitions") consumer.resume_partitions( {tp for tp in assignment if not self._is_changelog_tp(tp)} )"Seek stream partitions to committed offsets.") await self._wait( consumer.perform_seek(), ) else:"Resuming streams with empty assignment") self.completed.set() # Resume partitions and start fetching."Resuming flow...") app.flow_control.resume() consumer.resume_flow() # finally make sure the fetcher is running. await cast(_App, app)._fetcher.maybe_start() self.tables.on_actives_ready() self.tables.on_standbys_ready() app.on_rebalance_end()"Worker ready") @Service.task async def _restart_recovery(self) -> None: consumer = active_tps = self.active_tps standby_tps = self.standby_tps standby_offsets = self.standby_offsets standby_highwaters = self.standby_highwaters assigned_active_tps = self.active_tps assigned_standby_tps = self.standby_tps active_offsets = self.active_offsets standby_offsets = self.standby_offsets active_highwaters = self.active_highwaters while not self.should_stop:"WAITING FOR NEXT RECOVERY TO START") if await self.wait_for_stopped(self.signal_recovery_start): self.signal_recovery_start.clear() break # service was stopped self.signal_recovery_start.clear() generation_id = self._generation_id span: Any = None spans: list = [] tracer: Optional[opentracing.Tracer] = None if tracer ="_faust") if tracer is not None and self._recovery_span: span = tracer.start_span( "recovery-thread", child_of=self._recovery_span ) spans.extend([span, self._recovery_span]) T = traced_from_parent_span(span) try: await self._wait(T(asyncio.sleep)(self.recovery_delay)) if not self.tables or == URL("aerospike:"): # If there are no tables -- simply resume streams await T(self._resume_streams)(generation_id=generation_id) for _span in spans: finish_span(_span) continue self._set_recovery_started() self.standbys_pending = True # Must flush any buffers before starting rebalance. T(self.flush_buffers)() producer = cast(_App, if producer is not None: await self._wait( T(producer.flush)(),, )"Build highwaters for active partitions") await self._wait( T(self._build_highwaters)( consumer, assigned_active_tps, active_highwaters, "active" ),, )"Build offsets for active partitions") await self._wait( T(self._build_offsets)( consumer, assigned_active_tps, active_offsets, "active" ),, ) if for tp in assigned_active_tps: if ( active_offsets[tp] and active_highwaters[tp] and active_offsets[tp] > active_highwaters[tp] ): raise ConsistencyError( E_PERSISTED_OFFSET.format( tp, active_offsets[tp], active_highwaters[tp], ), )"Build offsets for standby partitions") await self._wait( T(self._build_offsets)( consumer, assigned_standby_tps, standby_offsets, "standby" ),, )"Seek offsets for active partitions") await self._wait( T(self._seek_offsets)( consumer, assigned_active_tps, active_offsets, "active" ),, ) if self.signal_recovery_start.is_set():"Restarting Recovery") continue if self.need_recovery(): self._set_recovery_started() self.standbys_pending = True"Restoring state from changelog topics...") T(consumer.resume_partitions)(active_tps) # Resume partitions and start fetching."Resuming flow...") T( T(consumer.resume_flow)() await T(cast(_App, # Wait for actives to be up to date. # This signal will be set by _slurp_changelogs if tracer is not None and span: self._actives_span = tracer.start_span( "recovery-actives", child_of=span, tags={"Active-Stats": self.active_stats()}, ) try: await self._wait(self.signal_recovery_end.wait()) except Exception as exc: finish_span(self._actives_span, error=exc) else: finish_span(self._actives_span) finally: self._actives_span = None # recovery done."Done reading from changelog topics") T(consumer.pause_partitions)(active_tps) else:"Resuming flow...") T( T(consumer.resume_flow)() self._set_recovery_ended() # The changelog partitions only in the active_tps set need to be resumed active_only_partitions = active_tps - standby_tps if active_only_partitions: # Support for the specific scenario where recovery_buffer=1 tps_resuming = [ tp for tp in active_only_partitions if self.tp_to_table[tp].recovery_buffer_size == 1 ] if tps_resuming: T(consumer.resume_partitions)(tps_resuming) T( T(consumer.resume_flow)()"Recovery complete") if span: span.set_tag("Recovery-Completed", True) if standby_tps:"Starting standby partitions...")"Seek standby offsets") await self._wait( T(self._seek_offsets)( consumer, standby_tps, standby_offsets, "standby" ),, )"Build standby highwaters") await self._wait( T(self._build_highwaters)( consumer, standby_tps, standby_highwaters, "standby", ),, ) if for tp in standby_tps: if ( standby_offsets[tp] and standby_highwaters[tp] and standby_offsets[tp] > standby_highwaters[tp] ): raise ConsistencyError( E_PERSISTED_OFFSET.format( tp, standby_offsets[tp], standby_highwaters[tp], ), ) if tracer is not None and span: self._standbys_span = tracer.start_span( "recovery-standbys", child_of=span, tags={"Standby-Stats": self.standby_stats()}, )"Resume standby partitions") T(consumer.resume_partitions)(standby_tps) T( T(consumer.resume_flow)() # Pause all our topic partitions, # to make sure we don't fetch any more records from them. await self._wait(T(self.on_recovery_completed)(generation_id)) except RebalanceAgain as exc:"RAISED REBALANCE AGAIN") for _span in spans: finish_span(_span, error=exc) continue # another rebalance started except IllegalStateError as exc:"RAISED REBALANCE AGAIN") for _span in spans: finish_span(_span, error=exc) continue # another rebalance started except ServiceStopped as exc:"RAISED SERVICE STOPPED") for _span in spans: finish_span(_span, error=exc) break # service was stopped except Exception as exc: for _span in spans: finish_span(_span, error=exc) raise else: for _span in spans: finish_span(_span) # restart - wait for next rebalance. def _set_recovery_started(self) -> None: self.in_recovery = True = True self._recovery_ended = None self._recovery_started_at = monotonic() self._active_events_received_at.clear() self._standby_events_received_at.clear() self._processing_times.clear() self._last_active_event_processed_at = None def _set_recovery_ended(self) -> None: self.in_recovery = False = False self._recovery_ended_at = monotonic() self._active_events_received_at.clear() self._standby_events_received_at.clear() self._processing_times.clear() self._last_active_event_processed_at = None
[docs] def active_remaining_seconds(self, remaining: float) -> str: s = self._estimated_active_remaining_secs(remaining) return humanize_seconds(s, now="none") if s else "???"
def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]: processing_times = self._processing_times if len(processing_times) >= self.num_samples_required_for_estimate: mean_time = statistics.mean(processing_times) return (mean_time * remaining) * 1.10 # add 10% else: return None async def _wait(self, coro: WaitArgT, timeout: Optional[int] = None) -> None: signal = self.signal_recovery_start wait_result = await self.wait_first(coro, signal, timeout=timeout) if wait_result.stopped: # service was stopped. raise ServiceStopped() elif self.signal_recovery_start in wait_result.done: # another rebalance started raise RebalanceAgain() return None
[docs] async def on_recovery_completed(self, generation_id: int = 0) -> None: """Call when active table recovery is completed.""" consumer ="Restore complete!") await self._set_recovery_ended() # This needs to happen if all goes well callback_coros = [ asyncio.ensure_future( table.on_recovery_completed( self.actives_for_table[table], self.standbys_for_table[table], ) ) for table in self.tables.values() ] if callback_coros: await asyncio.wait(callback_coros) assignment = consumer.assignment() if != generation_id: self.log.warning( f"Recovery rebalancing again app id " f"{} param {generation_id}" ) return consumer.resume_partitions( {tp for tp in assignment if not self._is_changelog_tp(tp)} ) if assignment:"Seek stream partitions to committed offsets.") await self._wait( consumer.perform_seek(), ) self.completed.set()"Resume stream partitions") consumer.resume_flow() # finally make sure the fetcher is running. await cast(_App, self.tables.on_actives_ready() if not self.tables.on_standbys_ready()"Worker ready")
async def _build_highwaters( self, consumer: ConsumerT, tps: Set[TP], destination: Counter[TP], title: str ) -> None: # -- Build highwater highwaters = await consumer.highwaters(*tps) highwaters = { # FIXME the -1 here is because of the way we commit offsets tp: value - 1 if value is not None else -1 for tp, value in highwaters.items() } "Highwater for %s changelog partitions:\n%s", title, self._highwater_logtable(highwaters, title=title), ) destination.clear() destination.update(highwaters) def _highwater_logtable(self, highwaters: Mapping[TP, int], *, title: str) -> str: table_data = [ [k.topic, str(k.partition), str(v)] for k, v in sorted(highwaters.items()) ] return terminal.logtable( list(self._consolidate_table_keys(table_data)), title=f"Highwater - {title.capitalize()}", headers=["topic", "partition", "highwater"], ) def _consolidate_table_keys(self, data: TableDataT) -> Iterator[List[str]]: """Format terminal log table to reduce noise from duplicate keys. We log tables where the first row is the name of the topic, and it gets noisy when that name is repeated over and over. This function replaces repeating topic names with the ditto mark. Note: Data must be sorted. """ prev_key: Optional[str] = None for key, *rest in data: if prev_key is not None and prev_key == key: yield ["〃", *rest] # ditto else: yield [key, *rest] prev_key = key async def _build_offsets( self, consumer: ConsumerT, tps: Set[TP], destination: Counter[TP], title: str ) -> None: # -- Update offsets # Offsets may have been compacted, need to get to the recent ones earliest = await consumer.earliest_offsets(*tps) # FIXME To be consistent with the offset -1 logic earliest = { tp: offset - 1 if offset is not None else None for tp, offset in earliest.items() } for tp in tps: last_value = destination[tp] new_value = earliest.get(tp, None) if last_value is None and new_value is None: destination[tp] = -1 elif last_value is None: destination[tp] = new_value elif new_value is None: destination[tp] = last_value else: destination[tp] = max(last_value, new_value) if destination: "%s offsets at start of reading:\n%s", title, self._start_offsets_logtable(destination, title=title), ) def _start_offsets_logtable(self, offsets: Mapping[TP, int], *, title: str) -> str: table_data = [ [k.topic, str(k.partition), str(v)] for k, v in sorted(offsets.items()) ] return terminal.logtable( list(self._consolidate_table_keys(table_data)), title=f"Reading Starts At - {title.capitalize()}", headers=["topic", "partition", "offset"], ) async def _seek_offsets( self, consumer: ConsumerT, tps: Set[TP], offsets: Counter[TP], title: str ) -> None: # Seek to new offsets new_offsets = {} for tp in tps: offset = offsets[tp] if offset == -1: offset = 0 new_offsets[tp] = offset # FIXME Remove check when fixed offset-1 discrepancy await consumer.seek_wait(new_offsets) @Service.task async def _slurp_changelogs(self) -> None: changelog_queue = self.tables.changelog_queue tp_to_table = self.tp_to_table active_tps = self.active_tps standby_tps = self.standby_tps active_offsets = self.active_offsets standby_offsets = self.standby_offsets active_events_received_at = self._active_events_received_at standby_events_received_at = self._standby_events_received_at buffers = self.buffers buffer_sizes = self.buffer_sizes processing_times = self._processing_times async def _maybe_signal_recovery_end(timeout=False, timeout_count=0) -> None: # lets wait at least 2 consecutive cycles for the queue to be # empty to avoid race conditions between # the aiokafka consumer position and draining of the queue if timeout and and timeout_count > 1: await detect_aborted_tx() if self.in_recovery and not self.need_recovery(): # apply anything stuck in the buffers self.flush_buffers() self._set_recovery_ended() if self._actives_span is not None: self._actives_span.set_tag("Actives-Ready", True) logger.debug("Setting recovery end") self.signal_recovery_end.set() async def detect_aborted_tx(): highwaters = self.active_highwaters offsets = self.active_offsets for tp, highwater in highwaters.items(): if ( highwater is not None and offsets[tp] is not None and offsets[tp] < highwater ): if await >= highwater:"Aborted tx until highwater for {tp}") offsets[tp] = highwater timeout_count = 0 while not self.should_stop: try: self.signal_recovery_end.clear() try: event: EventT = await asyncio.wait_for( changelog_queue.get(), timeout=5.0 ) except asyncio.TimeoutError: timeout_count += 1 if self.should_stop: return await _maybe_signal_recovery_end( timeout=True, timeout_count=timeout_count ) continue now = monotonic() timeout_count = 0 message = event.message tp = offset = message.offset logger.debug(f"Recovery message topic {tp} offset {offset}") offsets: Counter[TP] bufsize = buffer_sizes.get(tp) is_active = False if tp in active_tps: is_active = True table = tp_to_table[tp] offsets = active_offsets if bufsize is None: bufsize = buffer_sizes[tp] = table.recovery_buffer_size active_events_received_at[tp] = now elif tp in standby_tps: table = tp_to_table[tp] offsets = standby_offsets if bufsize is None: bufsize = buffer_sizes[tp] = table.standby_buffer_size standby_events_received_at[tp] = now else: logger.warning(f"recovery unknown topic {tp} offset {offset}") seen_offset = offsets.get(tp, None) logger.debug( f"seen offset for {tp} is {seen_offset} message offset {offset}" ) if seen_offset is None or offset > seen_offset: offsets[tp] = offset buf = buffers[table] buf.append(event) await table.on_changelog_event(event) if len(buf) >= bufsize: table.apply_changelog_batch(buf) buf.clear() self._last_flush_at = now now_after = monotonic() if is_active: last_processed_at = self._last_active_event_processed_at if last_processed_at is not None: processing_times.append(now_after - last_processed_at) max_samples = self.num_samples_required_for_estimate if len(processing_times) > max_samples: processing_times.popleft() self._last_active_event_processed_at = now_after await _maybe_signal_recovery_end() standby_ready = ( self._standbys_span is None and self.tables.standbys_ready ) if not standby_ready and not self.standby_remaining_total(): logger.debug("Completed standby partition fetch") if self._standbys_span: finish_span(self._standbys_span) self._standbys_span = None self.tables.on_standbys_ready() except Exception as ex: logger.warning(f"Error in recovery {ex}")
[docs] def flush_buffers(self) -> None: """Flush changelog buffers.""" for table, buffer in self.buffers.items(): table.apply_changelog_batch(buffer) buffer.clear() self._last_flush_at = monotonic()
[docs] def need_recovery(self) -> bool: """Return :const:`True` if recovery is required.""" return any(v > 0 for v in self.active_remaining().values())
[docs] def active_remaining(self) -> Counter[TP]: """Return counter of remaining changes by active partition.""" highwaters = self.active_highwaters offsets = self.active_offsets return Counter( { tp: highwater - offsets[tp] for tp, highwater in highwaters.items() if highwater is not None and offsets[tp] is not None } )
[docs] def standby_remaining(self) -> Counter[TP]: """Return counter of remaining changes by standby partition.""" highwaters = self.standby_highwaters offsets = self.standby_offsets return Counter( { tp: highwater - offsets[tp] for tp, highwater in highwaters.items() if highwater >= 0 and offsets[tp] >= 0 } )
[docs] def active_remaining_total(self) -> int: """Return number of changes remaining for actives to be up-to-date.""" return sum(self.active_remaining().values())
[docs] def standby_remaining_total(self) -> int: """Return number of changes remaining for standbys to be up-to-date.""" return sum(self.standby_remaining().values())
[docs] def active_stats(self) -> RecoveryStatsMapping: """Return current active recovery statistics.""" offsets = self.active_offsets return { tp: RecoveryStats(highwater, offsets[tp], highwater - offsets[tp]) for tp, highwater in self.active_highwaters.items() if offsets[tp] is not None and highwater - offsets[tp] != 0 }
[docs] def standby_stats(self) -> RecoveryStatsMapping: """Return current standby recovery statistics.""" offsets = self.standby_offsets return { tp: RecoveryStats(highwater, offsets[tp], highwater - offsets[tp]) for tp, highwater in self.standby_highwaters.items() if offsets[tp] is not None and highwater - offsets[tp] != 0 }
def _stats_to_logtable(self, title: str, stats: RecoveryStatsMapping) -> str: table_data = [ list( map( str, [ tp.topic, tp.partition, s.highwater, s.offset, s.remaining, ], ) ) for tp, s in sorted(stats.items()) ] return terminal.logtable( list(self._consolidate_table_keys(table_data)), title=title, headers=[ "topic", "partition", "need offset", "have offset", "remaining", ], ) @Service.task async def _publish_stats(self) -> None: """Emit stats (remaining to fetch) while in active recovery.""" interval = self.stats_interval await self.sleep(interval) async for sleep_time in self.itertimer(interval, name="Recovery.stats"): if self.in_recovery: now = monotonic() stats = self.active_stats() num_samples = len(self._processing_times) if stats and num_samples >= self.num_samples_required_for_estimate: remaining_total = self.active_remaining_total() "Still fetching changelog topics for recovery, " "estimated time remaining %s " "(total remaining=%r):\n%s", self.active_remaining_seconds(remaining_total), remaining_total, self._stats_to_logtable("Remaining for active recovery", stats), ) elif stats: await self._verify_remaining(now, stats) else: recovery_started_at = self._recovery_started_at if recovery_started_at is None: self.log.error( "POSSIBLE INTERNAL ERROR: " "Recovery marked as started but missing " "self._recovery_started_at timestamp." ) else: secs_since_started = now - recovery_started_at if secs_since_started >= 30.0: # This shouldn't happen, but we want to # log an error in case it does. self.log.error( "POSSIBLE INTERNAL ERROR: " "Recovery has no remaining offsets to fetch, " "but we have spent %s waiting for the worker " "to transition out of recovery state...", humanize_seconds(secs_since_started), ) async def _verify_remaining(self, now: float, stats: RecoveryStatsMapping) -> None: consumer = active_events_received_at = self._active_events_received_at recovery_started_at = self._recovery_started_at if recovery_started_at is None: return # we already log about this in _publish_stats secs_since_started = now - recovery_started_at last_flush_at = self._last_flush_at if last_flush_at is None: if secs_since_started >= self.flush_timeout_secs: self.log.warning( "Recovery has not flushed buffers since " "recovery startted (started %s). " "Current total buffer size: %r", humanize_seconds_ago(secs_since_started), self._current_total_buffer_size(), ) else: secs_since_last_flush = now - last_flush_at if secs_since_last_flush >= self.flush_timeout_secs: self.log.warning( "Recovery has not flushed buffers in the last %r " "seconds (last flush was %s). " "Current total buffer size: %r", self.flush_timeout_secs, humanize_seconds_ago(secs_since_last_flush), self._current_total_buffer_size(), ) for tp in stats: await self.sleep(0) if self.should_stop: break if not self.in_recovery: break consumer.verify_recovery_event_path(now, tp) secs_since_started = now - recovery_started_at last_event_received = active_events_received_at.get(tp) if last_event_received is None: if secs_since_started >= self.event_timeout_secs: self.log.warning( "No event received for active tp %r since recovery " "start (started %s)", tp, humanize_seconds_ago(secs_since_started), ) continue secs_since_received = now - last_event_received if secs_since_received >= self.event_timeout_secs: self.log.warning( "No event received for active tp %r in the last %r " "seconds (last event received %s)", tp, self.event_timeout_secs, humanize_seconds_ago(secs_since_received), ) def _current_total_buffer_size(self) -> int: return sum(len(buf) for buf in self.buffers.values()) def _is_changelog_tp(self, tp: TP) -> bool: return tp.topic in self.tables.changelog_topics