faust.livecheck

LiveCheck - End-to-end testing of asynchronous systems.

class faust.livecheck.LiveCheck(id: str, *, test_topic_name: Optional[str] = None, bus_topic_name: Optional[str] = None, report_topic_name: Optional[str] = None, bus_concurrency: Optional[int] = None, test_concurrency: Optional[int] = None, send_reports: Optional[bool] = None, **kwargs: Any)[source]

LiveCheck application.

SCAN_CATEGORIES: ClassVar[List[str]] = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task', 'livecheck.case']
class Signal(name: str = '', case: Optional[_Case] = None, index: int = -1)

Signal for test case using Kafka.

Used to wait for something to happen elsewhere.

async send(value: Optional[VT] = None, *, key: Optional[Any] = None, force: bool = False) None

Notify test that this signal is now complete.

Return type:

None

async wait(*, key: Optional[Any] = None, timeout: Optional[Union[timedelta, float, str]] = None) VT

Wait for signal to be completed.

Return type:

~VT

class Case(*, app: _LiveCheck, name: str, probability: Optional[float] = None, warn_stalled_after: Optional[Union[timedelta, float, str]] = None, active: Optional[bool] = None, signals: Optional[Iterable[BaseSignal]] = None, test_expires: Optional[Union[timedelta, float, str]] = None, frequency: Optional[Union[timedelta, float, str]] = None, realtime_logs: Optional[bool] = None, max_history: Optional[int] = None, max_consecutive_failures: Optional[int] = None, url_timeout_total: Optional[float] = None, url_timeout_connect: Optional[float] = None, url_error_retries: Optional[int] = None, url_error_delay_min: Optional[float] = None, url_error_delay_backoff: Optional[float] = None, url_error_delay_max: Optional[float] = None, **kwargs: Any)

LiveCheck test case.

Runner

alias of TestRunner

active: bool = True
consecutive_failures: int = 0
property current_execution: Optional[TestRunner]

Return the currently executing TestRunner in this task. :rtype: _UnionGenericAlias[TestRunner, None]

property current_test: Optional[TestExecution]

Return the currently active test in this task (if any). :rtype: _UnionGenericAlias[TestExecution, None]

async execute(test: TestExecution) None

Execute test using TestRunner.

Return type:

None

frequency: Optional[float] = None

How often we execute the test using fake data (define Case.make_fake_request()).

Set to None if production traffic is frequent enough to satisfy warn_stalled_after.

frequency_avg: Optional[float] = None
async get_url(url: Union[str, URL], **kwargs: Any) Optional[bytes]

Perform GET request using HTTP client.

Return type:

_UnionGenericAlias[bytes, None]

property label: str

Return human-readable label for this test case. :rtype: str

last_fail: Optional[float] = None

Timestamp of when the suite last failed.

last_test_received: Optional[float] = None

The warn_stalled_after timer uses this to keep track of either when a test was last received, or the last time the timer timed out.

latency_avg: Optional[float] = None
logger: logging.Logger = <Logger faust.livecheck.case (WARNING)>
async make_fake_request() None
Return type:

None

max_consecutive_failures: int = 30
max_history: int = 100

Max items to store in latency_history and runtime_history.

maybe_trigger(id: Optional[str] = None, *args: Any, **kwargs: Any) AsyncGenerator[Optional[TestExecution], None]

Schedule test execution, or not, based on probability setting.

Return type:

_GenericAlias[_UnionGenericAlias[TestExecution, None], None]

async on_suite_fail(exc: SuiteFailed, new_state: State = State.FAIL) None

Call when the suite fails.

Return type:

None

async on_test_error(runner: TestRunner, exc: BaseException) None

Call when a test execution raises an exception.

Return type:

None

async on_test_failed(runner: TestRunner, exc: BaseException) None

Call when invariant in test execution fails.

Return type:

None

async on_test_pass(runner: TestRunner) None

Call when a test execution passes.

Return type:

None

async on_test_skipped(runner: TestRunner) None

Call when a test is skipped.

Return type:

None

async on_test_start(runner: TestRunner) None

Call when a test starts executing.

Return type:

None

async on_test_timeout(runner: TestRunner, exc: BaseException) None

Call when a test execution times out.

Return type:

None

async post_report(report: TestReport) None

Publish test report.

Return type:

None

async post_url(url: Union[str, URL], **kwargs: Any) Optional[bytes]

Perform POST request using HTTP client.

Return type:

_UnionGenericAlias[bytes, None]

probability: float = 0.5

Probability of test running when live traffic is going through.

realtime_logs = False
async resolve_signal(key: str, event: SignalEvent) None

Mark test execution signal as resolved.

Return type:

None

async run(*test_args: Any, **test_kwargs: Any) None

Override this to define your test case.

Return type:

None

runtime_avg: Optional[float] = None
property seconds_since_last_fail: Optional[float]

Return number of seconds since any test failed. :rtype: _UnionGenericAlias[float, None]

state_transition_delay: float = 60.0
status: State = 'INIT'

Current state of this test.

test_expires: timedelta = datetime.timedelta(seconds=10800)
total_failures: int = 0
async trigger(id: Optional[str] = None, *args: Any, **kwargs: Any) TestExecution

Schedule test execution ASAP.

Return type:

TestExecution

url_error_delay_backoff: float = 1.5
url_error_delay_max: float = 5.0
url_error_delay_min: float = 0.5
url_error_retries: int = 10
async url_request(method: str, url: Union[str, URL], **kwargs: Any) Optional[bytes]

Perform URL request using HTTP client.

Return type:

_UnionGenericAlias[bytes, None]

url_timeout_connect: Optional[float] = None
url_timeout_total: Optional[float] = 300.0
warn_stalled_after: float = 1800.0

Timeout in seconds for when after we warn that nothing is processing.

app: _LiveCheck
name: str

Name of the test If not set this will be generated out of the subclass name.

latency_history: Deque[float]
frequency_history: Deque[float]
runtime_history: Deque[float]
signals: Dict[str, BaseSignal]
total_signals: int
total_by_state: Counter[State]
classmethod for_app(app: AppT, *, prefix: str = 'livecheck-', web_port: int = 9999, test_topic_name: Optional[str] = None, bus_topic_name: Optional[str] = None, report_topic_name: Optional[str] = None, bus_concurrency: Optional[int] = None, test_concurrency: Optional[int] = None, send_reports: Optional[bool] = None, **kwargs: Any) LiveCheck[source]

Create LiveCheck application targeting specific app.

The target app will be used to configure the LiveCheck app.

Return type:

LiveCheck

test_topic_name: str = 'livecheck'
bus_topic_name: str = 'livecheck-bus'
report_topic_name: str = 'livecheck-report'
bus_concurrency: int = 30

Number of concurrent actors processing signal events.

test_concurrency: int = 100

Number of concurrent actors executing test cases.

send_reports: bool = True

Unset this if you don’t want reports to be sent to the report_topic_name topic.

cases: Dict[str, Case]
property current_test: Optional[TestExecution]

Return the current test context (if any). :rtype: _UnionGenericAlias[TestExecution, None]

on_produce_attach_test_headers(sender: AppT, key: Optional[bytes] = None, value: Optional[bytes] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[List[Tuple[str, bytes]]] = None, signal: Optional[BaseSignalT] = None, **kwargs: Any) None[source]

Attach test headers to Kafka produce requests.

Return type:

None

case(*, name: ~typing.Optional[str] = None, probability: ~typing.Optional[float] = None, warn_stalled_after: ~typing.Union[~datetime.timedelta, float, str] = datetime.timedelta(seconds=1800), active: ~typing.Optional[bool] = None, test_expires: ~typing.Optional[~typing.Union[~datetime.timedelta, float, str]] = None, frequency: ~typing.Optional[~typing.Union[~datetime.timedelta, float, str]] = None, max_history: ~typing.Optional[int] = None, max_consecutive_failures: ~typing.Optional[int] = None, url_timeout_total: ~typing.Optional[float] = None, url_timeout_connect: ~typing.Optional[float] = None, url_error_retries: ~typing.Optional[float] = None, url_error_delay_min: ~typing.Optional[float] = None, url_error_delay_backoff: ~typing.Optional[float] = None, url_error_delay_max: ~typing.Optional[float] = None, base: ~typing.Type[~faust.livecheck.case.Case] = <class 'faust.livecheck.case.Case'>) Callable[[Type], Case][source]

Decorate class to be used as a test case.

Return type:

_CallableGenericAlias[_SpecialGenericAlias, Case]

Returns:

faust.livecheck.Case.

add_case(case: Case) Case[source]

Add and register new test case.

Return type:

Case

async post_report(report: TestReport) None[source]

Publish test report to reporting topic.

Return type:

None

logger: logging.Logger = <Logger faust.livecheck.app (WARNING)>
async on_start() None[source]

Call when LiveCheck application starts.

Return type:

None

async on_started() None[source]

Call when LiveCheck application is fully started.

Return type:

None

bus[source]

Topic used for signal communication.

pending_tests[source]

Topic used to keep pending test executions.

reports[source]

Topic used to log test reports.

class faust.livecheck.Case(*, app: _LiveCheck, name: str, probability: Optional[float] = None, warn_stalled_after: Optional[Union[timedelta, float, str]] = None, active: Optional[bool] = None, signals: Optional[Iterable[BaseSignal]] = None, test_expires: Optional[Union[timedelta, float, str]] = None, frequency: Optional[Union[timedelta, float, str]] = None, realtime_logs: Optional[bool] = None, max_history: Optional[int] = None, max_consecutive_failures: Optional[int] = None, url_timeout_total: Optional[float] = None, url_timeout_connect: Optional[float] = None, url_error_retries: Optional[int] = None, url_error_delay_min: Optional[float] = None, url_error_delay_backoff: Optional[float] = None, url_error_delay_max: Optional[float] = None, **kwargs: Any)[source]

LiveCheck test case.

Runner

alias of TestRunner

status: State = 'INIT'

Current state of this test.

last_test_received: Optional[float] = None

The warn_stalled_after timer uses this to keep track of either when a test was last received, or the last time the timer timed out.

last_fail: Optional[float] = None

Timestamp of when the suite last failed.

runtime_avg: Optional[float] = None
latency_avg: Optional[float] = None
frequency_avg: Optional[float] = None
state_transition_delay: float = 60.0
consecutive_failures: int = 0
total_failures: int = 0
app: _LiveCheck
name: str

Name of the test If not set this will be generated out of the subclass name.

active: bool = True
probability: float = 0.5

Probability of test running when live traffic is going through.

warn_stalled_after: float = 1800.0

Timeout in seconds for when after we warn that nothing is processing.

signals: Dict[str, BaseSignal]
test_expires: timedelta = datetime.timedelta(seconds=10800)
frequency: Optional[float] = None

How often we execute the test using fake data (define Case.make_fake_request()).

Set to None if production traffic is frequent enough to satisfy warn_stalled_after.

realtime_logs = False
max_history: int = 100

Max items to store in latency_history and runtime_history.

max_consecutive_failures: int = 30
url_timeout_total: Optional[float] = 300.0
url_timeout_connect: Optional[float] = None
url_error_retries: int = 10
url_error_delay_min: float = 0.5
url_error_delay_backoff: float = 1.5
url_error_delay_max: float = 5.0
frequency_history: Deque[float]
latency_history: Deque[float]
runtime_history: Deque[float]
total_by_state: Counter[State]
total_signals: int
maybe_trigger(id: Optional[str] = None, *args: Any, **kwargs: Any) AsyncGenerator[Optional[TestExecution], None][source]

Schedule test execution, or not, based on probability setting.

Return type:

_GenericAlias[_UnionGenericAlias[TestExecution, None], None]

async trigger(id: Optional[str] = None, *args: Any, **kwargs: Any) TestExecution[source]

Schedule test execution ASAP.

Return type:

TestExecution

async run(*test_args: Any, **test_kwargs: Any) None[source]

Override this to define your test case.

Return type:

None

logger: logging.Logger = <Logger faust.livecheck.case (WARNING)>
async resolve_signal(key: str, event: SignalEvent) None[source]

Mark test execution signal as resolved.

Return type:

None

log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
async execute(test: TestExecution) None[source]

Execute test using TestRunner.

Return type:

None

async on_test_start(runner: TestRunner) None[source]

Call when a test starts executing.

Return type:

None

async on_test_skipped(runner: TestRunner) None[source]

Call when a test is skipped.

Return type:

None

async on_test_failed(runner: TestRunner, exc: BaseException) None[source]

Call when invariant in test execution fails.

Return type:

None

async on_test_error(runner: TestRunner, exc: BaseException) None[source]

Call when a test execution raises an exception.

Return type:

None

async on_test_timeout(runner: TestRunner, exc: BaseException) None[source]

Call when a test execution times out.

Return type:

None

async on_test_pass(runner: TestRunner) None[source]

Call when a test execution passes.

Return type:

None

async post_report(report: TestReport) None[source]

Publish test report.

Return type:

None

async make_fake_request() None[source]
Return type:

None

async on_suite_fail(exc: SuiteFailed, new_state: State = State.FAIL) None[source]

Call when the suite fails.

Return type:

None

property seconds_since_last_fail: Optional[float]

Return number of seconds since any test failed. :rtype: _UnionGenericAlias[float, None]

async get_url(url: Union[str, URL], **kwargs: Any) Optional[bytes][source]

Perform GET request using HTTP client.

Return type:

_UnionGenericAlias[bytes, None]

async post_url(url: Union[str, URL], **kwargs: Any) Optional[bytes][source]

Perform POST request using HTTP client.

Return type:

_UnionGenericAlias[bytes, None]

async url_request(method: str, url: Union[str, URL], **kwargs: Any) Optional[bytes][source]

Perform URL request using HTTP client.

Return type:

_UnionGenericAlias[bytes, None]

property current_test: Optional[TestExecution]

Return the currently active test in this task (if any). :rtype: _UnionGenericAlias[TestExecution, None]

property current_execution: Optional[TestRunner]

Return the currently executing TestRunner in this task. :rtype: _UnionGenericAlias[TestRunner, None]

property label: str

Return human-readable label for this test case. :rtype: str

class faust.livecheck.Signal(name: str = '', case: Optional[_Case] = None, index: int = -1)[source]

Signal for test case using Kafka.

Used to wait for something to happen elsewhere.

async send(value: Optional[VT] = None, *, key: Optional[Any] = None, force: bool = False) None[source]

Notify test that this signal is now complete.

Return type:

None

async wait(*, key: Optional[Any] = None, timeout: Optional[Union[timedelta, float, str]] = None) VT[source]

Wait for signal to be completed.

Return type:

~VT

class faust.livecheck.TestRunner(case: _Case, test: TestExecution, started: float)[source]

Execute and keep track of test instance.

state: State = 'INIT'
report: Optional[TestReport] = None
error: Optional[BaseException] = None
case: _Case
test: TestExecution
started: float
ended: Optional[float]
runtime: Optional[float]
logs: List[Tuple[str, Tuple]]
signal_latency: Dict[str, float]
async execute() None[source]

Execute this test.

Return type:

None

async skip(reason: str) NoReturn[source]

Skip this test execution.

Return type:

_SpecialForm

async on_skipped(exc: LiveCheckTestSkipped) None[source]

Call when a test execution was skipped.

Return type:

None

async on_start() None[source]

Call when a test starts executing.

Return type:

None

async on_signal_wait(signal: BaseSignal, timeout: float) None[source]

Call when the test is waiting for a signal.

Return type:

None

async on_signal_received(signal: BaseSignal, time_start: float, time_end: float) None[source]

Call when a signal related to this test is received.

Return type:

None

async on_failed(exc: BaseException) None[source]

Call when an invariant in the test has failed.

Return type:

None

async on_error(exc: BaseException) None[source]

Call when test execution raises error.

Return type:

None

async on_timeout(exc: BaseException) None[source]

Call when test execution times out.

Return type:

None

async on_pass() None[source]

Call when test execution returns successfully.

Return type:

None

log_info(msg: str, *args: Any) None[source]

Log information related to the current execution.

Return type:

None

end() None[source]

End test execution.

Return type:

None

faust.livecheck.current_test() Optional[TestExecution][source]

Return information about the current test (if any).

Return type:

_UnionGenericAlias[TestExecution, None]