"""LiveCheck Signals - Test communication and synchronization."""
import asyncio
import typing
from time import monotonic
from typing import Any, Dict, Generic, Optional, Tuple, Type, TypeVar, cast
from mode import Seconds, want_seconds
from faust.models import maybe_model
from .exceptions import LiveCheckTestTimeout
from .locals import current_test_stack
from .models import SignalEvent
if typing.TYPE_CHECKING:
from .case import Case as _Case
else:
class _Case: ... # noqa
__all__ = ["BaseSignal", "Signal"]
VT = TypeVar("VT")
[docs]class BaseSignal(Generic[VT]):
"""Generic base class for signals."""
name: str
case: _Case
index: int
def __init__(self, name: str = "", case: _Case = None, index: int = -1) -> None:
self.name = name
self.case = cast(_Case, case)
self.index = index
[docs] async def send(
self, value: VT = None, *, key: Any = None, force: bool = False
) -> None:
"""Notify test that this signal is now complete."""
raise NotImplementedError()
[docs] async def wait(self, *, key: Any = None, timeout: Optional[Seconds] = None) -> VT:
"""Wait for signal to be completed."""
raise NotImplementedError()
[docs] async def resolve(self, key: Any, event: SignalEvent) -> None:
"""Resolve signal with value."""
self._set_current_value(key, event)
self._wakeup_resolvers()
def __set_name__(self, owner: Type, name: str) -> None:
if not self.name:
self.name = name
def _wakeup_resolvers(self) -> None:
self.case.app._can_resolve.set()
async def _wait_for_resolved(self, *, timeout: Optional[float] = None) -> None:
app = self.case.app
app._can_resolve.clear()
await app.wait(app._can_resolve, timeout=timeout)
def _get_current_value(self, key: Any) -> SignalEvent:
return self.case.app._resolved_signals[self._index_key(key)]
def _index_key(self, key: Any) -> Tuple[str, str, Any]:
return self.name, self.case.name, key
def _set_current_value(self, key: Any, event: SignalEvent) -> None:
self.case.app._resolved_signals[self._index_key(key)] = event
[docs] def clone(self, **kwargs: Any) -> "BaseSignal":
"""Clone this signal using keyword arguments."""
return type(self)(**{**self._asdict(), **kwargs})
def _asdict(self, **kwargs: Any) -> Dict:
return {"name": self.name, "case": self.case, "index": self.index}
def __repr__(self) -> str:
return f"<{type(self).__name__}: {self.name}>"
[docs]class Signal(BaseSignal[VT]):
"""Signal for test case using Kafka.
Used to wait for something to happen elsewhere.
"""
# What do we use for this? Kafka? some other mechanism?
# I'm thinking separate Kafka cluster, with a single
# topic for each test app.
[docs] async def send(
self, value: VT = None, *, key: Any = None, force: bool = False
) -> None:
"""Notify test that this signal is now complete."""
current_test = current_test_stack.top
if current_test is None:
if not force:
return
assert key
else:
key = key if key is not None else current_test.id
await self.case.app.bus.send(
key=key,
value=SignalEvent(
signal_name=self.name,
case_name=self.case.name,
key=key,
value=value,
),
)
[docs] async def wait(self, *, key: Any = None, timeout: Optional[Seconds] = None) -> VT:
"""Wait for signal to be completed."""
# wait for key to arrive in consumer
runner = self.case.current_execution
if runner is None:
raise RuntimeError("No test executing.")
test = runner.test
assert test
k: Any = test.id if key is None else key
timeout_s = want_seconds(timeout)
await runner.on_signal_wait(self, timeout=timeout_s)
time_start = monotonic()
event = await self._wait_for_message_by_key(key=k, timeout=timeout_s)
time_end = monotonic()
await runner.on_signal_received(
self,
time_start=time_start,
time_end=time_end,
)
self._verify_event(event, k, self.name, self.case.name)
return cast(VT, maybe_model(event.value))
def _verify_event(self, ev: SignalEvent, key: Any, name: str, case: str) -> None:
assert ev.key == key, f"{ev.key!r} == {key!r}"
assert ev.signal_name == name, f"{ev.signal_name!r} == {name!r}"
assert ev.case_name == case, f"{ev.case_name!r} == {case!r}"
async def _wait_for_message_by_key(
self, key: Any, *, timeout: Optional[float] = None, max_interval: float = 2.0
) -> SignalEvent:
app = self.case.app
time_start = monotonic()
remaining = timeout
# See if the key is already there.
try:
return self._get_current_value(key)
except KeyError:
pass
# If not, wait for it to arrive.
while not app.should_stop:
if remaining is not None:
remaining = remaining - (monotonic() - time_start)
try:
if remaining is not None and remaining <= 0.0:
try:
return self._get_current_value(key)
except KeyError:
raise asyncio.TimeoutError() from None
max_wait = None
if remaining is not None:
max_wait = min(remaining, max_interval)
await self._wait_for_resolved(timeout=max_wait)
except asyncio.TimeoutError:
msg = f"Timed out waiting for signal {self.name} ({timeout})"
raise LiveCheckTestTimeout(msg) from None
if app.should_stop:
break
try:
val = self._get_current_value(key)
return val
except KeyError:
pass
raise asyncio.CancelledError()