import abc
import logging
import ssl
import typing
import warnings
from datetime import timedelta, timezone, tzinfo
from pathlib import Path as _Path
from typing import (
Any,
Callable,
ClassVar,
Generic,
Iterable,
List,
Mapping,
Optional,
Tuple,
Type,
TypeVar,
Union,
cast,
)
from mode import Seconds as _Seconds, want_seconds
from mode.utils.imports import SymbolArg, symbol_by_name
from mode.utils.logging import Severity as _Severity
from yarl import URL as _URL
from faust.exceptions import ImproperlyConfigured
from faust.types.auth import CredentialsArg, CredentialsT, to_credentials
from faust.types.codecs import CodecArg, CodecT
from faust.utils import json
from faust.utils.urls import URIListArg, urllist
if typing.TYPE_CHECKING:
from .sections import Section as _Section
from .settings import Settings as _Settings
else:
class _Section: ... # noqa
class _Settings: ... # noqa
__all__ = [
"AutodiscoverArg",
"DictArg",
"URLArg",
"BrokerArg",
"Param",
"Bool",
"Str",
"Severity",
"Int",
"UnsignedInt",
"Version",
"Port",
"Seconds",
"Credentials",
"SSLContext",
"Dict",
"LogHandlers",
"Timezone",
"BrokerList",
"URL",
"Path",
"Codec",
"Enum",
"Symbol",
"to_bool",
]
#: Default transport used when no scheme specified.
DEFAULT_BROKER_SCHEME = "kafka"
T = TypeVar("T")
IT = TypeVar("IT") # Input type.
OT = TypeVar("OT") # Output type.
BOOLEAN_TERMS: Mapping[str, bool] = {
"": False,
"false": False,
"no": False,
"0": False,
"true": True,
"yes": True,
"1": True,
"on": True,
"off": False,
}
AutodiscoverArg = Union[
bool,
Iterable[str],
Callable[[], Iterable[str]],
]
DictArg = Union[str, Mapping[str, T]]
URLArg = Union[str, _URL]
BrokerArg = URIListArg
DEPRECATION_WARNING_TEMPLATE = """
Setting {self.name} is deprecated since Faust version \
{self.version_deprecated}: {self.deprecation_reason}. {alt_removal}
""".strip()
DEPRECATION_REMOVAL_WARNING = """
Further the setting is scheduled to be removed in Faust version \
{self.version_removal}.
""".strip()
[docs]def to_bool(
term: Union[str, bool], *, table: Mapping[str, bool] = BOOLEAN_TERMS
) -> bool:
"""Convert common terms for true/false to bool.
Examples (true/false/yes/no/on/off/1/0).
"""
if table is None:
table = BOOLEAN_TERMS
if isinstance(term, str):
try:
return table[term.lower()]
except KeyError:
raise TypeError("Cannot coerce {0!r} to type bool".format(term))
return term
OutputCallable = Callable[[_Settings, OT], OT]
OnDefaultCallable = Callable[[_Settings], IT]
[docs]class Param(Generic[IT, OT], property):
"""Faust setting desscription.
Describes a Faust setting, how to read it from environment
variables or from a configuration object.
"""
#: Textual description of setting type.
#: This is used by :file:`extra/tools/render_configuration_reference.py`
#: to display the types supported by this setting.
#:
#: Can be a tuple of actual classes, or a tuple of strings.
#: If a tuple of classes say a setting that accepts :class:`str`
#: and :class:`int`:
#:
#: .. sourcecode:: python
#:
#: text_type = (str, int)
#:
#: the generated description will be:
#:
#: .. sourcecode:: restructuredtext
#:
#: :type: :class:`str` / :class:`int`
#:
text_type: ClassVar[Tuple[Any, ...]] = (Any,)
#: Setting name (e.g. ``broker_request_timeout``).
name: str
#: Storage name (e.g. ``_broker_request_timeout``).
#: This is the attribute name where we'll be storing
#: the actual value for the setting.
#: For example ``Settings.broker_request_timeout`` will be a
#: property that calls ``Param.__get__`` on attribute access,
#: and ``.__set__`` when setting attribute value, and those
#: will use the underlying ``Settings._broker_request_timeout``
#: storage attribute to access/store the current value.
storage_name: str
#: Default value for setting.
#:
#: Note that this will not be used if :attr:`default_alias` or
#: :attr:`defaut_template` is set.
default: IT = cast(IT, None)
#: Environment variable name for setting.
#: For :setting:`broker_request_timeout` this would be
#: ``env_name="BROKER_REQUEST_TIMEOUT"``
env_name: Optional[str] = None
#: If setting is not customized this is an optional
#: list of other settings that we should take default value from.
#: For example the :setting:`broker_consumer`/:setting:`broker_producer`
#: settings can configure the broker URL for consumers and producers
#: separately but take their default value from the `broker` setting.
default_alias: Optional[str] = None
#: Default template.
#: If set the default value will be generated from this format string
#: template.
#: For example the :setting:`canonical_url` setting uses
#: ``default_template='http://{conf.web_host}:{conf.web_port}' to
#: generate a default value from the :setting:`web_host` and
#: :setting:`web_port` settings.
default_template: Optional[str] = None
#: Set to true if the value can be :const:`None`.
allow_none: bool = False
# If set to True we don't modify the value
# of the attribute to set a default.
# This is used by e.g. the env_prefix setting
# which has custom constructor code in Settings.on_init
ignore_default: bool = False
#: The configuration section this setting belongs to.
#: E.g. ``sections.Common``.
section: _Section
#: The version that this setting was first introduced.
#: This is used by :file:`extra/tools/render_configuration_reference.py`
#: to generate a version added directive:
#:
#: .. sourcecode:: restructuredtext
#:
#: .. versionadded:: 1.10
version_introduced: Optional[str] = None
#: Set this if the setting is deprecated and should not be used anymore.
#: Deprecated settings are not added to the configuration reference.
#: Note: You must also set a :attr:`deprecation_reason`.
version_deprecated: Optional[str] = None
deprecation_reason: Optional[str] = None
#: Mapping of version changes and reason for changing.
#: This is used by :file:`extra/tools/render_configuration_reference.py`
#: For example if this was enabled by default but then changed
#: to be disabled by default in version 1.30, then you can specify
#: that as ``version_changed={'1.30': 'Disabled by default.'}``
#: and the configuration reference will be rendered with the
#: following version changed directive added:
#:
#: .. sourcecode:: restructuredtext
#:
#: .. versionchanged:: 1.30
#:
#: Disabled by default.
version_changed: Optional[Mapping[str, str]] = None
#: Set this if the setting should be disabled completely,
#: but still be included in the code.
#: This is rare, no setting should be included in the
#: code if it has been removed. Currently this is only
#: used for the example setting that describes how you can add new
#: settings.
version_removed: Optional[str] = None
#: Mapping of related command line options.
#: This should be a mapping from command name to a list of option names.
#:
#: For example the :setting:`canonical_url` setting lists related
#: options as:
#:
#: .. sourcecode:: python
#:
#: related_cli_options={
#: 'faust worker': ['--web-host', '--web-port'],
#: }
#:
#: And this will end up in the configuration reference as:
#:
#: .. sourcecode:: restructuredtext
#:
#: :related-options: :option:`faust worker --web-host`,
#: :option:`faust worker --web-port`
related_cli_options: Mapping[str, List[str]]
#: List of related settings.
#: For example for the :setting:`canonical_url` setting
#: the list of related settings are defined as:
#: ``related_settings=[web_host, web_port]``.
#: The configuration reference will then include that as:
#:
#: .. sourcecode:: restructuredtext
#:
#: :related-settings: :setting:`web_host`, :setting:`web_port`
related_settings: List[Any]
#: Template used to generate a deprecation warning for deprecated settings.
deprecation_warning_template: str = DEPRECATION_WARNING_TEMPLATE
#: Template used to generate an additional removal warning
#: for the deprecation warning.
deprecation_removal_warning: str = DEPRECATION_REMOVAL_WARNING
def __init__(
self,
*,
name: str,
env_name: Optional[str] = None,
default: IT = None,
default_alias: Optional[str] = None,
default_template: Optional[str] = None,
allow_none: Optional[bool] = None,
ignore_default: Optional[bool] = None,
section: _Section = None,
version_introduced: Optional[str] = None,
version_deprecated: Optional[str] = None,
version_removed: Optional[str] = None,
version_changed: Mapping[str, str] = None,
deprecation_reason: Optional[str] = None,
related_cli_options: Mapping[str, List[str]] = None,
related_settings: List[Any] = None,
help: Optional[str] = None,
**kwargs: Any,
) -> None:
assert name
self.name = name
self.storage_name = f"_{name}"
if env_name is not None:
self.env_name = env_name
if default is not None:
self.default = default
if default_alias is not None:
self.default_alias = default_alias
if default_template is not None:
self.default_template = default_template
if allow_none is not None:
self.allow_none = allow_none
if ignore_default is not None:
self.ignore_default = ignore_default
if section is not None:
self.section = section
assert self.section
if version_introduced is not None:
self.version_introduced = version_introduced
if version_deprecated is not None:
self.version_deprecated = version_deprecated
if version_removed is not None:
self.version_removed = version_removed
if version_changed is not None:
self.version_changed = version_changed
if deprecation_reason is not None:
self.deprecation_reason = deprecation_reason
if help is not None:
self.__doc__ = help
self._on_get_value_: Optional[OutputCallable] = None
self._on_set_default_: Optional[OnDefaultCallable] = None
self.options = kwargs
self.related_cli_options = related_cli_options or {}
self.related_settings = related_settings or []
self._init_options(**self.options)
if self.version_deprecated:
assert self.deprecation_reason
def _init_options(self, **kwargs: Any) -> None:
"""Use in subclasses to quickly override ``__init__``."""
...
[docs] def on_get_value(self, fun: OutputCallable) -> OutputCallable:
"""Decorator that adds a callback when this setting is retrieved."""
assert self._on_get_value_ is None
self._on_get_value_ = fun
return fun
[docs] def on_set_default(self, fun: OnDefaultCallable) -> OnDefaultCallable:
"""Decorator that adds a callback when a default value is used."""
assert self._on_set_default_ is None
self._on_set_default_ = fun
return fun
def __get__(self, obj: Any, type: Type = None) -> OT:
if obj is None:
return self # type: ignore
if self.version_deprecated:
# we use UserWarning because DeprecationWarning is silenced
# by default in Python.
warnings.warn(UserWarning(self.build_deprecation_warning()), stacklevel=2)
return self.on_get(obj)
def __set__(self, obj: Any, value: IT) -> None:
self.on_set(obj, self.prepare_set(obj, value))
[docs] def on_get(self, conf: _Settings) -> OT:
"""What happens when the setting is accessed/retrieved."""
value = getattr(conf, self.storage_name)
if value is None and self.default_alias:
retval = getattr(conf, self.default_alias)
else:
retval = self.prepare_get(conf, value)
if self._on_get_value_ is not None:
return self._on_get_value_(conf, retval)
return retval
[docs] def prepare_get(self, conf: _Settings, value: OT) -> OT:
"""Prepare value when accessed/retrieved."""
return value
[docs] def on_set(self, settings: Any, value: OT) -> None:
"""What happens when the setting is stored/set."""
settings.__dict__[self.storage_name] = value
assert getattr(settings, self.storage_name) == value
[docs] def set_class_default(self, cls: Type) -> None:
"""Set class default value for storage attribute."""
setattr(cls, self.storage_name, self.default)
[docs] def on_init_set_value(self, conf: _Settings, provided_value: Optional[IT]) -> None:
"""What happens at ``Settings.__init__`` to store provided value.
Arguments:
conf: Settings object.
provided_value: Provided configuration value passed to
``Settings.__init__`` or :const:`None` if not set.
"""
if provided_value is not None:
self.__set__(conf, provided_value)
[docs] def on_init_set_default(
self, conf: _Settings, provided_value: Optional[IT]
) -> None:
"""What happens at ``Settings.__init__`` to set default value.
Arguments:
conf: Settings object.
provided_value: Provided configuration value passed to
``Settings.__init__`` or :const:`None` if not set.
"""
if provided_value is None:
default_value = self.default
if self._on_set_default_:
default_value = self._on_set_default_(conf)
if default_value is None and self.default_template:
default_value = self.default_template.format(conf=conf)
setattr(
conf, self.storage_name, self.prepare_init_default(conf, default_value)
)
[docs] def build_deprecation_warning(self) -> str:
"""Build deprecation warning for this setting."""
alt_removal = ""
if self.version_removed:
alt_removal = self.deprecation_removal_warning.format(self=self)
return self.deprecation_warning_template.format(
self=self,
alt_removal=alt_removal,
)
[docs] def validate_before(self, value: IT = None) -> None:
"""Validate value before setting is converted to the target type."""
...
[docs] def validate_after(self, value: OT) -> None:
"""Validate value after it has been converted to its target type."""
...
[docs] def prepare_set(self, conf: _Settings, value: IT) -> OT:
"""Prepare value for storage."""
skip_validate = value is None and self.allow_none
if not skip_validate:
self.validate_before(value)
if value is not None:
new_value = self.to_python(conf, value)
else:
new_value = value
if not skip_validate:
self.validate_after(new_value)
return new_value
[docs] def prepare_init_default(self, conf: _Settings, value: IT) -> OT:
"""Prepare default value for storage."""
if value is not None:
return self.to_python(conf, value)
return None
[docs] def to_python(self, conf: _Settings, value: IT) -> OT:
"""Convert value in input type to its output type."""
return cast(OT, value)
@property
def active(self) -> bool:
return not bool(self.version_removed)
@property
def deprecated(self) -> bool:
return bool(self.version_deprecated)
@property
def class_name(self) -> str:
return type(self).__name__
[docs]class Bool(Param[Any, bool]):
"""Boolean setting type."""
text_type = (bool,)
[docs] def to_python(self, conf: _Settings, value: Any) -> bool:
"""Convert given value to :class:`bool`."""
if isinstance(value, str):
return to_bool(value)
return bool(value)
[docs]class Str(Param[str, str]):
"""String setting type."""
text_type = (str,)
[docs]class Severity(Param[_Severity, _Severity]):
"""Logging severity setting type."""
text_type = (str, int)
class Number(Param[IT, OT]):
"""Number setting type (baseclass for int/float)."""
min_value: Optional[int] = None
max_value: Optional[int] = None
number_aliases: Mapping[IT, OT]
def _init_options(
self,
min_value: Optional[int] = None,
max_value: Optional[int] = None,
number_aliases: Mapping[IT, OT] = None,
**kwargs: Any,
) -> None:
if min_value is not None:
self.min_value = min_value
if max_value is not None:
self.max_value = max_value
self.number_aliases = number_aliases or {}
@abc.abstractmethod
def convert(self, conf: _Settings, value: IT) -> OT: ...
def to_python(self, conf: _Settings, value: IT) -> OT:
"""Convert given value to number."""
try:
return self.number_aliases[value]
except KeyError:
return self.convert(conf, value)
def validate_after(self, value: OT) -> None:
"""Validate number value."""
v = cast(int, value)
min_ = self.min_value
max_ = self.max_value
if min_ is not None and v < min_:
raise self._out_of_range(v)
if max_ is not None and v > max_:
raise self._out_of_range(v)
def _out_of_range(self, value: float) -> ImproperlyConfigured:
return ImproperlyConfigured(
f"Value {value} is out of range for {self.class_name} "
f"(min={self.min_value} max={self.max_value})"
)
NumberInputArg = Union[str, int, float]
class _Int(Number[IT, OT]):
text_type = (int,)
def convert(self, conf: _Settings, value: IT) -> OT:
"""Convert given value to int."""
return cast(OT, int(cast(int, value)))
[docs]class Int(_Int[NumberInputArg, int]):
"""Signed integer setting type."""
[docs]class UnsignedInt(_Int[NumberInputArg, int]):
"""Unsigned integer setting type."""
min_value = 0
[docs]class Version(Int):
"""Version setting type.
Versions must be greater than ``1``.
"""
min_value = 1
[docs]class Port(UnsignedInt):
"""Network port setting type.
Ports must be in the range 1-65535.
"""
min_value = 1
max_value = 65535
[docs]class Seconds(Param[_Seconds, float]):
"""Seconds setting type.
Converts from :class:`float`/:class:`~datetime.timedelta` to
:class:`float`.
"""
text_type = (float, timedelta)
[docs] def to_python(self, conf: _Settings, value: _Seconds) -> float:
return want_seconds(value)
[docs]class Credentials(Param[CredentialsArg, Optional[CredentialsT]]):
"""Authentication credentials setting type."""
text_type = (CredentialsT,)
[docs] def to_python(
self, conf: _Settings, value: CredentialsArg
) -> Optional[CredentialsT]:
return to_credentials(value)
[docs]class SSLContext(Param[ssl.SSLContext, Optional[ssl.SSLContext]]):
"""SSL context setting type."""
text_type = (ssl.SSLContext,)
[docs]class Dict(Param[DictArg[T], Mapping[str, T]]):
"""Dictionary setting type."""
text_type = (dict,)
[docs] def to_python(self, conf: _Settings, value: DictArg[T]) -> Mapping[str, T]:
if isinstance(value, str):
return json.loads(value)
elif isinstance(value, Mapping):
return value
return dict(value)
[docs]class LogHandlers(Param[List[logging.Handler], List[logging.Handler]]):
"""Log handler list setting type."""
text_type = (List[logging.Handler],)
[docs] def prepare_init_default(
self, conf: _Settings, value: Any
) -> List[logging.Handler]:
return []
[docs]class Timezone(Param[Union[str, tzinfo], tzinfo]):
"""Timezone setting type."""
text_type = (tzinfo,)
builtin_timezones = {"UTC": timezone.utc}
[docs] def to_python(self, conf: _Settings, value: Union[str, tzinfo]) -> tzinfo:
if isinstance(value, str):
try:
return cast(tzinfo, self.builtin_timezones[value.lower()])
except KeyError:
import pytz
return cast(tzinfo, pytz.timezone(value))
else:
return value
[docs]class BrokerList(Param[BrokerArg, List[_URL]]):
"""Broker URL list setting type."""
text_type = (str, _URL, List[str])
default_scheme = DEFAULT_BROKER_SCHEME
[docs] def to_python(self, conf: _Settings, value: BrokerArg) -> List[_URL]:
return self.broker_list(value)
[docs] def broker_list(self, value: BrokerArg) -> List[_URL]:
return urllist(value, default_scheme=self.default_scheme)
[docs]class URL(Param[URLArg, _URL]):
"""URL setting type."""
text_type = (str, _URL)
[docs] def to_python(self, conf: _Settings, value: URLArg) -> _URL:
return _URL(value)
[docs]class Path(Param[Union[str, _Path], _Path]):
"""Path setting type."""
text_type = (str, _Path)
expanduser: bool = True
[docs] def to_python(self, conf: _Settings, value: Union[str, _Path]) -> _Path:
p = _Path(value)
if self.expanduser:
p = p.expanduser()
return self.prepare_path(conf, p)
[docs] def prepare_path(self, conf: _Settings, path: _Path) -> _Path:
return path
[docs]class Codec(Param[CodecArg, CodecArg]):
"""Serialization codec setting type."""
text_type = (str, CodecT)
[docs]def Enum(typ: T) -> Type[Param[Union[str, T], T]]:
"""Generate new enum setting type."""
class EnumParam(Param[Union[str, T], T]):
text_type = (str,)
def to_python(self, conf: _Settings, value: Union[str, T]) -> T:
return typ(value) # type: ignore
return EnumParam
class _Symbol(Param[IT, OT]):
text_type = (str, Type)
def to_python(self, conf: _Settings, value: IT) -> OT:
return cast(OT, symbol_by_name(value))
[docs]def Symbol(typ: T) -> Type[Param[SymbolArg[T], T]]:
"""Generate new symbol setting type."""
return _Symbol[SymbolArg[T], T]