Source code for faust.web.apps.graph

"""Web endpoint showing graph of running :pypi:`mode` services."""

import io

from faust import web

__all__ = ["Graph", "blueprint"]

blueprint = web.Blueprint("graph")


[docs]@blueprint.route("/", name="detail") class Graph(web.View): """ --- description: Render image from graph of running services. tags: - Faust produces: - application/json """
[docs] async def get(self, request: web.Request) -> web.Response: """Draw image of the services running in this worker.""" try: import pydot except ImportError: return self.text("Please install pydot: pip install pydot", status=500) o = io.StringIO() beacon = self.app.beacon.root or self.app.beacon beacon.as_graph().to_dot(o) (graph,) = pydot.graph_from_dot_data(o.getvalue()) return self.bytes(graph.create_png(), content_type="image/png")