Source code for faust.livecheck.case

"""LiveCheck - Test cases."""

import traceback
import typing
from collections import deque
from contextlib import ExitStack, asynccontextmanager
from datetime import datetime, timedelta, timezone
from itertools import count
from random import uniform
from statistics import median
from time import monotonic
from typing import (
    Any,
    AsyncGenerator,
    ClassVar,
    Counter,
    Deque,
    Dict,
    Iterable,
    Optional,
    Type,
    Union,
    cast,
)

from aiohttp import ClientError, ClientTimeout
from mode import Seconds, Service, want_seconds
from mode.utils.times import humanize_seconds
from yarl import URL

from faust.utils import uuid

from .exceptions import ServiceDown, SuiteFailed, SuiteStalled
from .locals import current_execution_stack, current_test_stack
from .models import SignalEvent, State, TestExecution, TestReport
from .runners import TestRunner
from .signals import BaseSignal

if typing.TYPE_CHECKING:
    from .app import LiveCheck as _LiveCheck
else:

    class _LiveCheck: ...  # noqa


__all__ = ["Case"]


[docs]class Case(Service): """LiveCheck test case.""" Runner: ClassVar[Type[TestRunner]] = TestRunner app: _LiveCheck #: Name of the test #: If not set this will be generated out of the subclass name. name: str active: bool = True #: Current state of this test. status: State = State.INIT #: How often we execute the test using fake data #: (define Case.make_fake_request()). #: #: Set to None if production traffic is frequent enough to #: satisfy :attr:`warn_stalled_after`. frequency: Optional[float] = None #: Timeout in seconds for when after we warn that nothing is processing. warn_stalled_after: float = 1800.0 #: Probability of test running when live traffic is going through. probability: float = 0.5 max_consecutive_failures: int = 30 #: The warn_stalled_after timer uses this to keep track of #: either when a test was last received, or the last time the timer #: timed out. last_test_received: Optional[float] = None #: Timestamp of when the suite last failed. last_fail: Optional[float] = None #: Max items to store in :attr:`latency_history` and #: :attr:`runtime_history`. max_history: int = 100 latency_history: Deque[float] frequency_history: Deque[float] runtime_history: Deque[float] runtime_avg: Optional[float] = None latency_avg: Optional[float] = None frequency_avg: Optional[float] = None signals: Dict[str, BaseSignal] _original_signals: Iterable[BaseSignal] total_signals: int test_expires: timedelta = timedelta(hours=3) realtime_logs = False url_timeout_total: Optional[float] = 5 * 60.0 url_timeout_connect: Optional[float] = None url_error_retries: int = 10 url_error_delay_min: float = 0.5 url_error_delay_backoff: float = 1.5 url_error_delay_max: float = 5.0 state_transition_delay: float = 60.0 consecutive_failures: int = 0 total_failures: int = 0 total_by_state: Counter[State] def __init__( self, *, app: _LiveCheck, name: str, probability: Optional[float] = None, warn_stalled_after: Optional[Seconds] = None, active: Optional[bool] = None, signals: Iterable[BaseSignal] = None, test_expires: Optional[Seconds] = None, frequency: Optional[Seconds] = None, realtime_logs: Optional[bool] = None, max_history: Optional[int] = None, max_consecutive_failures: Optional[int] = None, url_timeout_total: Optional[float] = None, url_timeout_connect: Optional[float] = None, url_error_retries: Optional[int] = None, url_error_delay_min: Optional[float] = None, url_error_delay_backoff: Optional[float] = None, url_error_delay_max: Optional[float] = None, **kwargs: Any, ) -> None: self.app = app self.name = name if active is not None: self.active = active if probability is not None: self.probability = probability if warn_stalled_after is not None: self.warn_stalled_after = want_seconds(warn_stalled_after) self._original_signals = signals or () self.signals = { sig.name: sig.clone(case=self) for sig in self._original_signals } if test_expires is not None: self.test_expires = timedelta(seconds=want_seconds(test_expires)) if frequency is not None: self.frequency = want_seconds(frequency) if realtime_logs is not None: self.realtime_logs = realtime_logs if max_history is not None: self.max_history = max_history if max_consecutive_failures is not None: self.max_consecutive_failures = max_consecutive_failures if url_timeout_total is not None: self.url_timeout_total = url_timeout_total if url_timeout_connect is not None: self.url_timeout_connect = url_timeout_connect if url_error_retries is not None: self.url_error_retries = url_error_retries if url_error_delay_min is not None: self.url_error_delay_min = url_error_delay_min if url_error_delay_backoff is not None: self.url_error_delay_backoff = url_error_delay_backoff if url_error_delay_max is not None: self.url_error_delay_max = url_error_delay_max self.frequency_history = deque(maxlen=self.max_history) self.latency_history = deque(maxlen=self.max_history) self.runtime_history = deque(maxlen=self.max_history) self.total_by_state = Counter() self.total_signals = len(self.signals) # update local attribute so that the # signal attributes have the correct signal instance. self.__dict__.update(self.signals) Service.__init__(self, loop=app.loop, **kwargs) @Service.timer(10.0) async def _sampler(self) -> None: await self._sample() async def _sample(self) -> None: if self.frequency_history: self.frequency_avg = median(self.frequency_history) if self.latency_history: self.latency_avg = median(self.latency_history) if self.runtime_history: self.runtime_avg = median(self.runtime_history) self.log.info( "Stats: (median) frequency=%r latency=%r runtime=%r", self.frequency_avg, self.latency_avg, self.runtime_avg, )
[docs] @asynccontextmanager async def maybe_trigger( self, id: Optional[str] = None, *args: Any, **kwargs: Any ) -> AsyncGenerator[Optional[TestExecution], None]: """Schedule test execution, or not, based on probability setting.""" execution: Optional[TestExecution] = None with ExitStack() as exit_stack: if uniform(0, 1) < self.probability: # nosec B311 execution = await self.trigger(id, *args, **kwargs) exit_stack.enter_context(current_test_stack.push(execution)) yield execution
[docs] async def trigger( self, id: Optional[str] = None, *args: Any, **kwargs: Any ) -> TestExecution: """Schedule test execution ASAP.""" id = id or uuid() execution = TestExecution( id=id, case_name=self.name, timestamp=self._now(), test_args=args, test_kwargs=kwargs, expires=self._now() + self.test_expires, ) await self.app.pending_tests.send(key=id, value=execution) return execution
def _now(self) -> datetime: return datetime.utcnow().astimezone(timezone.utc)
[docs] async def run(self, *test_args: Any, **test_kwargs: Any) -> None: """Override this to define your test case.""" raise NotImplementedError("Case class must implement run")
[docs] async def resolve_signal(self, key: str, event: SignalEvent) -> None: """Mark test execution signal as resolved.""" await self.signals[event.signal_name].resolve(key, event)
[docs] async def execute(self, test: TestExecution) -> None: """Execute test using :class:`TestRunner`.""" t_start = monotonic() runner = self.Runner(self, test, started=t_start) with current_execution_stack.push(runner): # resolve_models await runner.execute()
[docs] async def on_test_start(self, runner: TestRunner) -> None: """Call when a test starts executing.""" started = runner.started t_prev, self.last_test_received = self.last_test_received, started if t_prev: time_since = started - t_prev wanted_frequency = self.frequency if wanted_frequency: latency = time_since - wanted_frequency self.latency_history.append(latency) self.frequency_history.append(time_since)
[docs] async def on_test_skipped(self, runner: TestRunner) -> None: """Call when a test is skipped.""" # wait until we have fast forwarded before raising errors # XXX should we use seek, or warn somehow if this # takes too long? self.last_test_received = monotonic()
[docs] async def on_test_failed(self, runner: TestRunner, exc: BaseException) -> None: """Call when invariant in test execution fails.""" await self._set_test_error_state(State.FAIL)
[docs] async def on_test_error(self, runner: TestRunner, exc: BaseException) -> None: """Call when a test execution raises an exception.""" await self._set_test_error_state(State.ERROR)
[docs] async def on_test_timeout(self, runner: TestRunner, exc: BaseException) -> None: """Call when a test execution times out.""" await self._set_test_error_state(State.TIMEOUT)
async def _set_test_error_state(self, state: State) -> None: self.status = state self.consecutive_failures += 1 self.total_failures += 1 self.total_by_state[state] += 1 if self.consecutive_failures >= self.max_consecutive_failures: try: raise SuiteFailed( "Failed after {0!r} (max={1!r})".format( self.consecutive_failures, self.max_consecutive_failures ) ) except SuiteFailed as exc: await self.on_suite_fail(exc) def _set_pass_state(self, state: State) -> None: assert state.is_ok() self.status = state self.consecutive_failures = 0 self.total_by_state[state] += 1
[docs] async def on_test_pass(self, runner: TestRunner) -> None: """Call when a test execution passes.""" test = runner.test runtime: float = runner.runtime or 0.0 self.runtime_history.append(runtime) ts = test.timestamp.timestamp() last_fail = self.last_fail if last_fail is None or ts > last_fail: self._maybe_recover_from_failed_state()
[docs] async def post_report(self, report: TestReport) -> None: """Publish test report.""" await self.app.post_report(report)
@Service.task async def _send_frequency(self) -> None: freq = self.frequency if freq: async for sleep_time in self.itertimer(freq, name=f"{self.name}_send"): if self.app.is_leader(): await self.make_fake_request()
[docs] async def make_fake_request(self) -> None: ...
@Service.task async def _check_frequency(self) -> None: timeout = self.warn_stalled_after await self.sleep(timeout) self.last_test_received = None time_start = monotonic() last_warning: Optional[float] = None async for sleep_time in self.itertimer(timeout, name=f"{self.name}._wempty"): try: now = monotonic() can_warn = now - last_warning if last_warning else True if can_warn: if self.last_test_received is not None: secs_since = now - self.last_test_received else: secs_since = now - time_start if secs_since > self.warn_stalled_after: human_secs = humanize_seconds(secs_since) # we reset the timer to avoid logging every second. last_warning = now raise SuiteStalled( f"Test stalled! Last received {human_secs} ago " f"(warn_stalled_after={timeout})." ) else: self._maybe_recover_from_failed_state() except SuiteStalled as exc: # we don't want to propagate this here, keep running... await self.on_suite_fail(exc, State.STALL)
[docs] async def on_suite_fail( self, exc: SuiteFailed, new_state: State = State.FAIL ) -> None: """Call when the suite fails.""" assert isinstance(exc, SuiteFailed) delay = self.state_transition_delay if self.status.is_ok() or self._failed_longer_than(delay): self.status = new_state self.last_fail = monotonic() self.log.exception(str(exc)) await self.post_report( TestReport( case_name=self.name, state=new_state, test=None, runtime=None, signal_latency={}, error=str(exc), traceback="\n".join(traceback.format_tb(exc.__traceback__)), ) ) else: self.status = new_state self.last_fail = monotonic()
def _maybe_recover_from_failed_state(self) -> None: if self.status != State.DO_NOT_SHARE: if self._failed_longer_than(self.state_transition_delay): self._set_pass_state(State.DO_NOT_SHARE) def _failed_longer_than(self, secs: float) -> bool: secs_since_fail = self.seconds_since_last_fail if secs_since_fail is None: return True else: return secs_since_fail > secs @property def seconds_since_last_fail(self) -> Optional[float]: """Return number of seconds since any test failed.""" last_fail = self.last_fail return monotonic() - last_fail if last_fail else None
[docs] async def get_url(self, url: Union[str, URL], **kwargs: Any) -> Optional[bytes]: """Perform GET request using HTTP client.""" return await self.url_request("get", url, **kwargs)
[docs] async def post_url(self, url: Union[str, URL], **kwargs: Any) -> Optional[bytes]: """Perform POST request using HTTP client.""" return await self.url_request("post", url, **kwargs)
[docs] async def url_request( self, method: str, url: Union[str, URL], **kwargs: Any ) -> Optional[bytes]: """Perform URL request using HTTP client.""" timeout = ClientTimeout( # mypy thinks this must be float, but it can be None. total=cast(float, self.url_timeout_total), connect=cast(float, self.url_timeout_connect), ) error_delay = self.url_error_delay_min try: for i in count(): try: async with self.app.http_client.request( method, url, timeout=timeout, **kwargs ) as response: response.raise_for_status() payload = await response.read() self._maybe_recover_from_failed_state() return payload except ClientError as exc: if i >= self.url_error_retries: raise ServiceDown(f"Cannot send fake test request: {exc!r}") retry_in = humanize_seconds(error_delay, microseconds=True) self.log.warning( "URL %r raised: %r (Will retry in %s)", url, exc, retry_in ) error_delay = min( error_delay * self.url_error_delay_backoff, self.url_error_delay_max, ) await self.sleep(error_delay) else: # pragma: no cover pass except ServiceDown as exc: # we don't want to propagate this here, keep running... await self.on_suite_fail(exc) return None
@property def current_test(self) -> Optional[TestExecution]: """Return the currently active test in this task (if any).""" return current_test_stack.top @property def current_execution(self) -> Optional[TestRunner]: """Return the currently executing :class:`TestRunner` in this task.""" return current_execution_stack.top @property def label(self) -> str: """Return human-readable label for this test case.""" return f"{type(self).__name__}: {self.name}"