"""Base interface for Web server and views."""
import abc
import socket
from datetime import datetime
from http import HTTPStatus
from pathlib import Path
from typing import (
Any,
Callable,
ClassVar,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Type,
Union,
)
from urllib.parse import quote
from mode import Service
from mode.utils.compat import want_str
from mode.utils.imports import SymbolArg, symbol_by_name
from yarl import URL
from faust.types import AppT
from faust.types.web import BlueprintT, ResourceOptions, View
__all__ = [
"DEFAULT_BLUEPRINTS",
"BlueprintManager",
"Request",
"Response",
"Web",
]
_bytes = bytes
_BPArg = SymbolArg[BlueprintT]
_BPList = Iterable[Tuple[str, _BPArg]]
DEFAULT_BLUEPRINTS: _BPList = [
("/router", "faust.web.apps.router:blueprint"),
("/table", "faust.web.apps.tables.blueprint"),
]
PRODUCTION_BLUEPRINTS: _BPList = [
("", "faust.web.apps.production_index:blueprint"),
]
DEBUG_BLUEPRINTS: _BPList = [
("/graph", "faust.web.apps.graph:blueprint"),
("", "faust.web.apps.stats:blueprint"),
]
CONTENT_SEPARATOR: bytes = b"\r\n\r\n"
HEADER_SEPARATOR: bytes = b"\r\n"
HEADER_KEY_VALUE_SEPARATOR: bytes = b": "
[docs]class Response:
"""Web server response and status."""
@property
@abc.abstractmethod
def status(self) -> int:
"""Return the response status code."""
...
@property
@abc.abstractmethod
def body(self) -> _bytes:
"""Return the response body as bytes."""
...
@property
@abc.abstractmethod
def headers(self) -> MutableMapping:
"""Return mapping of response HTTP headers."""
...
@property
@abc.abstractmethod
def content_length(self) -> Optional[int]:
"""Return the size of the response body."""
...
@property
@abc.abstractmethod
def content_type(self) -> str:
"""Return the response content type."""
...
@property
@abc.abstractmethod
def charset(self) -> Optional[str]:
"""Return the response character set."""
...
@property
@abc.abstractmethod
def chunked(self) -> bool:
"""Return :const:`True` if response is chunked."""
...
@property
@abc.abstractmethod
def compression(self) -> bool:
"""Return :const:`True` if the response body is compressed."""
...
@property
@abc.abstractmethod
def keep_alive(self) -> Optional[bool]:
"""Return :const:`True` if HTTP keep-alive enabled."""
...
@property
@abc.abstractmethod
def body_length(self) -> int:
"""Size of HTTP response body."""
...
[docs]class BlueprintManager:
"""Manager of all blueprints."""
applied: bool
_enabled: List[Tuple[str, _BPArg]]
_active: MutableMapping[str, BlueprintT]
def __init__(self, initial: _BPList = None) -> None:
self.applied = False
self._enabled = list(initial) if initial else []
self._active = {}
[docs] def add(self, prefix: str, blueprint: _BPArg) -> None:
"""Register blueprint with this app."""
if self.applied:
raise RuntimeError("Cannot add blueprints after server started")
self._enabled.append((prefix, blueprint))
[docs] def apply(self, web: "Web") -> None:
"""Apply all blueprints."""
if not self.applied:
self.applied = True
for prefix, blueprint in self._enabled:
bp: BlueprintT = symbol_by_name(blueprint)
self._apply_blueprint(web, prefix, bp)
def _apply_blueprint(self, web: "Web", prefix: str, bp: BlueprintT) -> None:
self._active[bp.name] = bp
bp.register(web.app, url_prefix=prefix)
bp.init_webserver(web)
[docs]class Web(Service):
"""Web server and HTTP interface."""
default_blueprints: ClassVar[_BPList] = DEFAULT_BLUEPRINTS # noqa: E704
production_blueprints: ClassVar[_BPList] = PRODUCTION_BLUEPRINTS
debug_blueprints: ClassVar[_BPList] = DEBUG_BLUEPRINTS
app: AppT
driver_version: str
views: MutableMapping[str, View]
reverse_names: MutableMapping[str, str]
blueprints: BlueprintManager
content_separator: ClassVar[bytes] = CONTENT_SEPARATOR
header_separator: ClassVar[bytes] = HEADER_SEPARATOR
header_key_value_separator: ClassVar[bytes] = HEADER_KEY_VALUE_SEPARATOR
def __init__(self, app: AppT, **kwargs: Any) -> None:
self.app = app
self.views = {}
self.reverse_names = {}
blueprints = list(self.default_blueprints)
if self.app.conf.debug:
blueprints.extend(self.debug_blueprints)
else:
blueprints.extend(self.production_blueprints)
self.blueprints = BlueprintManager(blueprints)
Service.__init__(self, loop=app.loop, **kwargs)
[docs] @abc.abstractmethod
def text(
self,
value: str,
*,
content_type: Optional[str] = None,
status: int = 200,
reason: Optional[str] = None,
headers: MutableMapping = None,
) -> Response:
"""Create text response, using "text/plain" content-type."""
...
[docs] @abc.abstractmethod
def html(
self,
value: str,
*,
content_type: Optional[str] = None,
status: int = 200,
reason: Optional[str] = None,
headers: MutableMapping = None,
) -> Response:
"""Create HTML response from string, ``text/html`` content-type."""
...
[docs] @abc.abstractmethod
def json(
self,
value: Any,
*,
content_type: Optional[str] = None,
status: int = 200,
reason: Optional[str] = None,
headers: MutableMapping = None,
) -> Response:
"""Create new JSON response.
Accepts any JSON-serializable value and will automatically
serialize it for you.
The content-type is set to "application/json".
"""
...
[docs] @abc.abstractmethod
def bytes(
self,
value: _bytes,
*,
content_type: Optional[str] = None,
status: int = 200,
reason: Optional[str] = None,
headers: MutableMapping = None,
) -> Response:
"""Create new ``bytes`` response - for binary data."""
...
[docs] @abc.abstractmethod
def bytes_to_response(self, s: _bytes) -> Response:
"""Deserialize HTTP response from byte string."""
...
def _bytes_to_response(self, s: _bytes) -> Tuple[HTTPStatus, Mapping, _bytes]:
status_code, _, payload = s.partition(self.content_separator)
headers, _, body = payload.partition(self.content_separator)
return (
HTTPStatus(int(status_code)),
dict(self._splitheader(h) for h in headers.splitlines()),
body,
)
def _splitheader(self, header: _bytes) -> Tuple[str, str]:
key, value = header.split(self.header_key_value_separator, 1)
return want_str(key.strip()), want_str(value.strip())
[docs] @abc.abstractmethod
def response_to_bytes(self, response: Response) -> _bytes:
"""Serialize HTTP response into byte string."""
...
def _response_to_bytes(self, status: int, headers: Mapping, body: _bytes) -> _bytes:
return self.content_separator.join(
[
str(status).encode(),
self.content_separator.join(
[
self._headers_serialize(headers),
body,
]
),
]
)
def _headers_serialize(self, headers: Mapping) -> _bytes:
return self.header_separator.join(
self.header_key_value_separator.join(
[
k if isinstance(k, _bytes) else k.encode("ascii"),
v if isinstance(v, _bytes) else v.encode("latin-1"),
]
)
for k, v in headers.items()
)
[docs] @abc.abstractmethod
def route(
self,
pattern: str,
handler: Callable,
cors_options: Mapping[str, ResourceOptions] = None,
) -> None:
"""Add route for handler."""
...
[docs] @abc.abstractmethod
def add_static(self, prefix: str, path: Union[Path, str], **kwargs: Any) -> None:
"""Add static route."""
...
[docs] @abc.abstractmethod
async def read_request_content(self, request: "Request") -> _bytes:
"""Read HTTP body as bytes."""
...
[docs] @abc.abstractmethod
async def wsgi(self) -> Any:
"""WSGI entry point."""
...
[docs] def add_view(
self,
view_cls: Type[View],
*,
prefix: str = "",
cors_options: Mapping[str, ResourceOptions] = None,
) -> View:
"""Add route for view."""
view: View = view_cls(self.app, self)
path = prefix.rstrip("/") + "/" + view.view_path.lstrip("/")
self.route(path, view, cors_options)
self.views[path] = view
self.reverse_names[view.view_name] = path
return view
[docs] def url_for(self, view_name: str, **kwargs: Any) -> str:
"""Get URL by view name.
If the provided view name has associated URL parameters,
those need to be passed in as kwargs, or a :exc:`TypeError`
will be raised.
"""
try:
path = self.reverse_names[view_name]
except KeyError:
raise KeyError(f"No view with name {view_name!r} found")
else:
return path.format(
**{k: self._quote_for_url(str(v)) for k, v in kwargs.items()}
)
def _quote_for_url(self, value: str) -> str:
return quote(value, safe="") # disable '/' being safe by default
[docs] def init_server(self) -> None:
"""Initialize and setup web server."""
self.blueprints.apply(self)
self.app.on_webserver_init(self)
@property
def url(self) -> URL:
"""Return the canonical URL to this worker (including port)."""
canon = self.app.conf.canonical_url
if canon.host == socket.gethostname():
return URL(f"http://localhost:{self.app.conf.web_port}/")
return self.app.conf.canonical_url
[docs]class Request(abc.ABC):
"""HTTP Request."""
method: str
headers: Mapping[str, str]
url: URL
rel_url: URL
query_string: str
keep_alive: bool
body_exists: bool
user: Any
if_modified_since: Optional[datetime]
if_unmodified_since: Optional[datetime]
if_range: Optional[datetime]
[docs] @abc.abstractmethod
def can_read_body(self) -> bool:
"""Return :const:`True` if the request has a body."""
...
[docs] @abc.abstractmethod
async def read(self) -> bytes:
"""Read post data as bytes."""
...
[docs] @abc.abstractmethod
async def text(self) -> str:
"""Read post data as text."""
...
[docs] @abc.abstractmethod
async def json(self) -> Any:
"""Read post data and deserialize as JSON."""
...
[docs] @abc.abstractmethod
async def post(self) -> Mapping[str, str]:
"""Read post data."""
...
@property
@abc.abstractmethod
def match_info(self) -> Mapping[str, str]:
"""Return match info from URL route as a mapping."""
...
@property
@abc.abstractmethod
def query(self) -> Mapping[str, str]:
"""Return HTTP query parameters as a mapping."""
...
@property
@abc.abstractmethod
def cookies(self) -> Mapping[str, Any]:
"""Return cookies as a mapping."""
...