Source code for faust.web.apps.router
"""HTTP endpoint showing partition routing destinations."""
from faust import web
from faust.web.exceptions import ServiceUnavailable
__all__ = ["TableList", "TopicList", "TableDetail", "TableKeyDetail", "blueprint"]
blueprint = web.Blueprint("router")
[docs]@blueprint.route("/", name="list")
class TableList(web.View):
"""
---
description: List routes for all tables.
tags:
- Faust
produces:
- application/json
"""
[docs] async def get(self, request: web.Request) -> web.Response:
"""Return JSON response with list of all table routes."""
router = self.app.router
return self.json(router.tables_metadata())
[docs]@blueprint.route("/topics", name="list")
class TopicList(web.View):
"""List routes for all external topics."""
[docs] async def get(self, request: web.Request) -> web.Response:
"""Return JSON response with list of all table routes."""
router = self.app.router
return self.json(router.external_topics_metadata())
[docs]@blueprint.route("/{name}/", name="detail")
class TableDetail(web.View):
"""
---
description: List route for a specific table.
tags:
- Faust
parameters:
- in: path
name: name
type: string
required: true
produces:
- application/json
"""
[docs] async def get(self, request: web.Request, name: str) -> web.Response:
"""Return JSON response with table metadata."""
router = self.app.router
return self.json(router.table_metadata(name))
[docs]@blueprint.route("/{name}/{key}/", name="key-detail")
class TableKeyDetail(web.View):
"""
---
description: List information about key.
tags:
- Faust
parameters:
- in: path
name: name
type: string
required: true
- in: path
name: key
type: string
required: true
produces:
- application/json
"""
[docs] async def get(self, request: web.Request, name: str, key: str) -> web.Response:
"""Return JSON response after looking up the route of a table key.
Arguments:
name: Name of table.
key: Key to look up node for.
Raises:
faust.web.exceptions.ServiceUnavailable: if broker metadata
has not yet been received.
"""
router = self.app.router
try:
dest_url = router.key_store(name, key)
except KeyError:
raise ServiceUnavailable()
else:
return self.json(str(dest_url))