"""Monitor using Statsd."""
import typing
from typing import Any, Dict, Optional, cast
from mode.utils.objects import cached_property
from faust import web
from faust.exceptions import ImproperlyConfigured
from faust.types import (
TP,
AppT,
CollectionT,
EventT,
Message,
PendingMessage,
RecordMetadata,
StreamT,
)
from faust.types.assignor import PartitionAssignorT
from faust.types.transports import ConsumerT, ProducerT
from .monitor import Monitor, TPOffsetMapping
try:
import statsd
except ImportError: # pragma: no cover
statsd = None
if typing.TYPE_CHECKING: # pragma: no cover
from statsd import StatsClient
else:
class StatsClient: ... # noqa
__all__ = ["StatsdMonitor"]
[docs]class StatsdMonitor(Monitor):
"""Statsd Faust Sensor.
This sensor, records statistics to Statsd along with computing metrics
for the stats server
"""
host: str
port: int
prefix: str
def __init__(
self,
host: str = "localhost",
port: int = 8125,
prefix: str = "faust-app",
rate: float = 1.0,
**kwargs: Any,
) -> None:
self.host = host
self.port = port
self.prefix = prefix
self.rate = rate
if statsd is None:
raise ImproperlyConfigured("StatsMonitor requires `pip install statsd`.")
super().__init__(**kwargs)
def _new_statsd_client(self) -> StatsClient:
return statsd.StatsClient(host=self.host, port=self.port, prefix=self.prefix)
[docs] def on_message_in(self, tp: TP, offset: int, message: Message) -> None:
"""Call before message is delegated to streams."""
super().on_message_in(tp, offset, message)
self.client.incr("messages_received", rate=self.rate)
self.client.incr("messages_active", rate=self.rate)
self.client.incr(f"topic.{tp.topic}.messages_received", rate=self.rate)
self.client.gauge(f"read_offset.{tp.topic}.{tp.partition}", offset)
[docs] def on_stream_event_in(
self, tp: TP, offset: int, stream: StreamT, event: EventT
) -> Optional[Dict]:
"""Call when stream starts processing an event."""
state = super().on_stream_event_in(tp, offset, stream, event)
self.client.incr("events", rate=self.rate)
self.client.incr(
f"stream.{self._stream_label(stream)}.events",
rate=self.rate,
)
self.client.incr("events_active", rate=self.rate)
return state
def _stream_label(self, stream: StreamT) -> str:
return (
self._normalize(
stream.shortlabel.lstrip("Stream:"),
)
.strip("_")
.lower()
)
[docs] def on_stream_event_out(
self, tp: TP, offset: int, stream: StreamT, event: EventT, state: Dict = None
) -> None:
"""Call when stream is done processing an event."""
super().on_stream_event_out(tp, offset, stream, event, state)
self.client.decr("events_active", rate=self.rate)
if state is not None:
self.client.timing(
"events_runtime",
self.secs_to_ms(self.events_runtime[-1]),
rate=self.rate,
)
[docs] def on_message_out(self, tp: TP, offset: int, message: Message) -> None:
"""Call when message is fully acknowledged and can be committed."""
super().on_message_out(tp, offset, message)
self.client.decr("messages_active", rate=self.rate)
[docs] def on_table_get(self, table: CollectionT, key: Any) -> None:
"""Call when value in table is retrieved."""
super().on_table_get(table, key)
self.client.incr(f"table.{table.name}.keys_retrieved", rate=self.rate)
[docs] def on_table_set(self, table: CollectionT, key: Any, value: Any) -> None:
"""Call when new value for key in table is set."""
super().on_table_set(table, key, value)
self.client.incr(f"table.{table.name}.keys_updated", rate=self.rate)
[docs] def on_table_del(self, table: CollectionT, key: Any) -> None:
"""Call when key in a table is deleted."""
super().on_table_del(table, key)
self.client.incr(f"table.{table.name}.keys_deleted", rate=self.rate)
[docs] def on_commit_completed(self, consumer: ConsumerT, state: Any) -> None:
"""Call when consumer commit offset operation completed."""
super().on_commit_completed(consumer, state)
self.client.timing(
"commit_latency", self.ms_since(cast(float, state)), rate=self.rate
)
[docs] def on_send_initiated(
self,
producer: ProducerT,
topic: str,
message: PendingMessage,
keysize: int,
valsize: int,
) -> Any:
"""Call when message added to producer buffer."""
self.client.incr(f"topic.{topic}.messages_sent", rate=self.rate)
return super().on_send_initiated(producer, topic, message, keysize, valsize)
[docs] def on_send_completed(
self, producer: ProducerT, state: Any, metadata: RecordMetadata
) -> None:
"""Call when producer finished sending message."""
super().on_send_completed(producer, state, metadata)
self.client.incr("messages_sent", rate=self.rate)
self.client.timing(
"send_latency", self.ms_since(cast(float, state)), rate=self.rate
)
[docs] def on_send_error(
self, producer: ProducerT, exc: BaseException, state: Any
) -> None:
"""Call when producer was unable to publish message."""
super().on_send_error(producer, exc, state)
self.client.incr("messages_sent_error", rate=self.rate)
self.client.timing(
"send_latency_for_error", self.ms_since(cast(float, state)), rate=self.rate
)
[docs] def on_assignment_error(
self, assignor: PartitionAssignorT, state: Dict, exc: BaseException
) -> None:
"""Partition assignor did not complete assignor due to error."""
super().on_assignment_error(assignor, state, exc)
self.client.incr("assignments_error", rate=self.rate)
self.client.timing(
"assignment_latency", self.ms_since(state["time_start"]), rate=self.rate
)
[docs] def on_assignment_completed(
self, assignor: PartitionAssignorT, state: Dict
) -> None:
"""Partition assignor completed assignment."""
super().on_assignment_completed(assignor, state)
self.client.incr("assignments_complete", rate=self.rate)
self.client.timing(
"assignment_latency", self.ms_since(state["time_start"]), rate=self.rate
)
[docs] def on_rebalance_start(self, app: AppT) -> Dict:
"""Cluster rebalance in progress."""
state = super().on_rebalance_start(app)
self.client.incr("rebalances", rate=self.rate)
return state
[docs] def on_rebalance_return(self, app: AppT, state: Dict) -> None:
"""Consumer replied assignment is done to broker."""
super().on_rebalance_return(app, state)
self.client.decr("rebalances", rate=self.rate)
self.client.incr("rebalances_recovering", rate=self.rate)
self.client.timing(
"rebalance_return_latency",
self.ms_since(state["time_return"]),
rate=self.rate,
)
[docs] def on_rebalance_end(self, app: AppT, state: Dict) -> None:
"""Cluster rebalance fully completed (including recovery)."""
super().on_rebalance_end(app, state)
self.client.decr("rebalances_recovering", rate=self.rate)
self.client.timing(
"rebalance_end_latency", self.ms_since(state["time_end"]), rate=self.rate
)
[docs] def count(self, metric_name: str, count: int = 1) -> None:
"""Count metric by name."""
super().count(metric_name, count=count)
self.client.incr(metric_name, count=count, rate=self.rate)
[docs] def on_tp_commit(self, tp_offsets: TPOffsetMapping) -> None:
"""Call when offset in topic partition is committed."""
super().on_tp_commit(tp_offsets)
for tp, offset in tp_offsets.items():
metric_name = f"committed_offset.{tp.topic}.{tp.partition}"
self.client.gauge(metric_name, offset)
[docs] def track_tp_end_offset(self, tp: TP, offset: int) -> None:
"""Track new topic partition end offset for monitoring lags."""
super().track_tp_end_offset(tp, offset)
metric_name = f"end_offset.{tp.topic}.{tp.partition}"
self.client.gauge(metric_name, offset)
[docs] def on_web_request_end(
self,
app: AppT,
request: web.Request,
response: Optional[web.Response],
state: Dict,
*,
view: web.View = None,
) -> None:
"""Web server finished working on request."""
super().on_web_request_end(app, request, response, state, view=view)
status_code = int(state["status_code"])
self.client.incr(f"http_status_code.{status_code}", rate=self.rate)
self.client.timing(
"http_response_latency", self.ms_since(state["time_end"]), rate=self.rate
)
[docs] @cached_property
def client(self) -> StatsClient:
"""Return statsd client."""
return self._new_statsd_client()