Source code for faust.web.apps.stats

"""HTTP endpoint showing statistics from the Faust monitor."""

from collections import defaultdict
from typing import List, MutableMapping, Set

from faust import web
from faust.types.tuples import TP

__all__ = ["Assignment", "Stats", "blueprint"]

TPMap = MutableMapping[str, List[int]]


blueprint = web.Blueprint("monitor")


[docs]@blueprint.route("/", name="index") class Stats(web.View): """ --- description: Monitor statistics. tags: - Faust produces: - application/json """
[docs] async def get(self, request: web.Request) -> web.Response: """Return JSON response with sensor information.""" return self.json( {f"Sensor{i}": s.asdict() for i, s in enumerate(self.app.sensors)} )
[docs]@blueprint.route("/assignment/", name="assignment") class Assignment(web.View): """ --- description: Cluster assignment information. tags: - Faust produces: - application/json """ @classmethod def _topic_grouped(cls, assignment: Set[TP]) -> TPMap: tps: MutableMapping[str, List[int]] = defaultdict(list) for tp in sorted(assignment): tps[tp.topic].append(tp.partition) return dict(tps)
[docs] async def get(self, request: web.Request) -> web.Response: """Return current assignment as a JSON response.""" assignor = self.app.assignor return self.json( { "actives": self._topic_grouped(assignor.assigned_actives()), "standbys": self._topic_grouped(assignor.assigned_standbys()), } )