Source code for faust.models.record

"""Record - Dictionary Model."""

from collections import OrderedDict
from datetime import datetime
from decimal import Decimal
from itertools import chain
from typing import (
    Any,
    Callable,
    Dict,
    FrozenSet,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Set,
    Tuple,
    Type,
    cast,
)

from mode.utils.objects import annotations, is_optional, remove_optional
from mode.utils.text import pluralize

from faust.types.models import (
    CoercionMapping,
    FieldDescriptorT,
    FieldMap,
    IsInstanceArgT,
    ModelOptions,
    ModelT,
)
from faust.utils import codegen

from .base import Model
from .fields import FieldDescriptor, field_for_type
from .tags import Tag

__all__ = ["Record"]

DATE_TYPES: IsInstanceArgT = (datetime,)
DECIMAL_TYPES: IsInstanceArgT = (Decimal,)

ALIAS_FIELD_TYPES = {
    dict: Dict,
    tuple: Tuple,
    list: List,
    set: Set,
    frozenset: FrozenSet,
}

E_NON_DEFAULT_FOLLOWS_DEFAULT = """
Non-default {cls_name} field {field_name} cannot
follow default {fields} {default_names}
"""

_ReconFun = Callable[..., Any]


[docs]class Record(Model, abstract=True): # type: ignore """Describes a model type that is a record (Mapping). Examples: >>> class LogEvent(Record, serializer='json'): ... severity: str ... message: str ... timestamp: float ... optional_field: str = 'default value' >>> event = LogEvent( ... severity='error', ... message='Broken pact', ... timestamp=666.0, ... ) >>> event.severity 'error' >>> serialized = event.dumps() '{"severity": "error", "message": "Broken pact", "timestamp": 666.0}' >>> restored = LogEvent.loads(serialized) <LogEvent: severity='error', message='Broken pact', timestamp=666.0> >>> # You can also subclass a Record to create a new record >>> # with additional fields >>> class RemoteLogEvent(LogEvent): ... url: str >>> # You can also refer to record fields and pass them around: >>> LogEvent.severity >>> <FieldDescriptor: LogEvent.severity (str)> """ def __init_subclass__( cls, serializer: Optional[str] = None, namespace: Optional[str] = None, include_metadata: Optional[bool] = None, isodates: Optional[bool] = None, abstract: bool = False, allow_blessed_key: Optional[bool] = None, decimals: Optional[bool] = None, coerce: Optional[bool] = None, coercions: CoercionMapping = None, polymorphic_fields: Optional[bool] = None, validation: Optional[bool] = None, date_parser: Callable[[Any], datetime] = None, lazy_creation: bool = False, **kwargs: Any, ) -> None: # XXX mypy 0.750 requires this to be defined on the class, # and do not recognize the parent class signature. super().__init_subclass__( serializer=serializer, namespace=namespace, include_metadata=include_metadata, isodates=isodates, abstract=abstract, allow_blessed_key=allow_blessed_key, decimals=decimals, coerce=coerce, coercions=coercions, polymorphic_fields=polymorphic_fields, validation=validation, date_parser=date_parser, lazy_creation=lazy_creation, **kwargs, ) @classmethod def _contribute_to_options(cls, options: ModelOptions) -> None: # Find attributes and their types, and create indexes for these. # This only happens once when the class is created, so Faust # models are fast at runtime. fields, defaults = annotations( cls, stop=Record, skip_classvar=True, alias_types=ALIAS_FIELD_TYPES, localns={cls.__name__: cls}, ) options.fields = cast(Mapping, fields) options.fieldset = frozenset(fields) options.fieldpos = {i: k for i, k in enumerate(fields.keys())} # noqa: C416 # extract all default values, but only for actual fields. options.defaults = { k: v.default if isinstance(v, FieldDescriptor) else v for k, v in defaults.items() if k in fields and not (isinstance(v, FieldDescriptor) and v.required) } # Raise error if non-defaults are mixed in with defaults # like namedtuple/dataclasses do. local_defaults = [] for attr_name in cls.__annotations__: if attr_name in cls.__dict__: default_value = cls.__dict__[attr_name] if isinstance(default_value, FieldDescriptorT): if not default_value.required: local_defaults.append(attr_name) else: local_defaults.append(attr_name) else: if local_defaults: raise TypeError( E_NON_DEFAULT_FOLLOWS_DEFAULT.format( cls_name=cls.__name__, field_name=attr_name, fields=pluralize(len(local_defaults), "field"), default_names=", ".join(local_defaults), ) ) for field, typ in fields.items(): if is_optional(typ): # Optional[X] also needs to be added to defaults mapping. options.defaults.setdefault(field, None) # Create frozenset index of default fields. options.optionalset = frozenset(options.defaults) @classmethod def _contribute_methods(cls) -> None: if not getattr(cls.asdict, "faust_generated", False): raise RuntimeError("Not allowed to override Record.asdict()") cls.asdict = cls._BUILD_asdict() # type: ignore cls.asdict.faust_generated = True # type: ignore cls._input_translate_fields = cls._BUILD_input_translate_fields() @classmethod def _contribute_field_descriptors( cls, target: Type, options: ModelOptions, parent: Optional[FieldDescriptorT] = None, ) -> FieldMap: fields = options.fields defaults = options.defaults date_parser = options.date_parser coerce = options.coerce index = {} secret_fields = set() sensitive_fields = set() personal_fields = set() tagged_fields = set() def add_to_tagged_indices(field: str, tag: Type[Tag]) -> None: if tag.is_secret: options.has_secret_fields = True secret_fields.add(field) if tag.is_sensitive: options.has_sensitive_fields = True sensitive_fields.add(field) if tag.is_personal: options.has_personal_fields = True personal_fields.add(field) options.has_tagged_fields = True tagged_fields.add(field) def add_related_to_tagged_indices( field: str, related_model: Type = None ) -> None: if related_model is None: return try: related_options = related_model._options except AttributeError: return if related_options.has_secret_fields: options.has_secret_fields = True secret_fields.add(field) if related_options.has_sensitive_fields: options.has_sensitive_fields = True sensitive_fields.add(field) if related_options.has_personal_fields: options.has_personal_fields = True personal_fields.add(field) if related_options.has_tagged_fields: options.has_tagged_fields = True tagged_fields.add(field) for field, typ in fields.items(): try: default, needed = defaults[field], False except KeyError: default, needed = None, True descr = getattr(target, field, None) if is_optional(typ): target_type = remove_optional(typ) else: target_type = typ if descr is None or not isinstance(descr, FieldDescriptorT): DescriptorType, tag = field_for_type(target_type) if tag: add_to_tagged_indices(field, tag) descr = DescriptorType( field=field, type=typ, model=cls, required=needed, default=default, parent=parent, coerce=coerce, model_coercions=options.coercions, date_parser=date_parser, tag=tag, ) else: descr = descr.clone( field=field, type=typ, model=cls, required=needed, default=default, parent=parent, coerce=coerce, model_coercions=options.coercions, ) descr.on_model_attached() for related_model in descr.related_models: add_related_to_tagged_indices(field, related_model) setattr(target, field, descr) index[field] = descr options.secret_fields = frozenset(secret_fields) options.sensitive_fields = frozenset(sensitive_fields) options.personal_fields = frozenset(personal_fields) options.tagged_fields = frozenset(tagged_fields) return index
[docs] @classmethod def from_data( cls, data: Mapping, *, preferred_type: Type[ModelT] = None ) -> "Record": """Create model object from Python dictionary.""" # check for blessed key to see if another model should be used. if hasattr(data, "__is_model__"): return cast(Record, data) else: self_cls = cls._maybe_namespace(data, preferred_type=preferred_type) cls._input_translate_fields(data) return (self_cls or cls)(**data, __strict__=False)
def __init__( self, *args: Any, __strict__: bool = True, __faust: Any = None, **kwargs: Any ) -> None: # pragma: no cover ... # overridden by _BUILD_init @classmethod def _BUILD_input_translate_fields(cls) -> Callable[[MutableMapping], None]: translate = [ f"data[{field!r}] = data.pop({d.input_name!r}, None)" for field, d in cls._options.descriptors.items() if d.field != d.input_name ] return cast( Callable, classmethod( codegen.Function( "_input_translate_fields", ["cls", "data"], translate if translate else ["pass"], globals=globals(), locals=locals(), ) ), ) @classmethod def _BUILD_init(cls) -> Callable[[], None]: # generate init function that set field values from arguments, # and that load data from serialized form. # # # The general template that we will be generating is # # def __outer__(Model): # create __init__ closure # __defaults__ = Model._options.defaults # __descr__ = Model._options.descriptors # {% for field in fields_with_defaults %} # _default_{{ field }}_ = __defaults__["{{ field }}"] # {% endfor %} # {% for field in fields_with_init %} # _init_{{ field }}_ = __descr__["{{ field }}"].to_python # # def __init__(self, {{ sig }}, *, __strict__=True, **kwargs): # self.__evaluated_fields__ = set() # if __strict__: # creating model from Python # {% for field in fields %} # self.{{ field }} = {{ field }} # {% endfor %} # if kwargs: # # raise error for additional arguments # else: # {% for field in fields %} # {% if OPTIONAL_FIELD(field) %} # if {{ field }} is not None: # self.{{ field }} = _init_{{ field }}({{ field }}) # else: # self.{{ field }} = _default_{{ field }} # {% else %} # self.{{ field }} = _init_{{ field }}({{ field }} # # any additional kwargs are added as fields # # when loading from serialized data. # self.__dict__.update(kwargs) # self.__post_init__() # return __init__ # options = cls._options field_positions = options.fieldpos optional = options.optionalset needs_validation = options.validation descriptors = options.descriptors has_post_init = hasattr(cls, "__post_init__") closures: Dict[str, str] = { "__defaults__": "Model._options.defaults", "__descr__": "Model._options.descriptors", } kwonlyargs = ["*", "__strict__=True", "__faust=None", "**kwargs"] # these are sets, but we care about order as we will # be generating signature arguments in the correct order. # # The order is decided by the order of fields in the class): # # class Foo(Record): # c: int # a: int # # becomes: # # def __init__(self, c, a): # self.c = c # self.a = a optional_fields: Dict[str, bool] = OrderedDict() required_fields: Dict[str, bool] = OrderedDict() def generate_setter(field: str, getval: str) -> str: """Generate code that sets attribute for field in class. Arguments: field: Name of field. getval: Source code that initializes value for field, can be the field name itself for no initialization or for example: ``f"self._prepare_value({field})"``. out: Destination list where new source code lines are added. """ if field in optional: optional_fields[field] = True default_var = f"_default_{field}_" closures[default_var] = f'__defaults__["{field}"]' return ( f" self.{field} = {getval} " f"if {field} is not None else {default_var}" ) else: required_fields[field] = True return f" self.{field} = {getval}" def generate_prepare_value(field: str) -> str: descriptor = descriptors[field] if descriptor.lazy_coercion: return field # no initialization else: # call descriptor.to_python init_field_var = f"_init_{field}_" closures[init_field_var] = f'__descr__["{field}"].to_python' return f"{init_field_var}({field})" preamble = [ "self.__evaluated_fields__ = set()", ] data_setters = ["if __strict__:"] + [ generate_setter(field, field) for field in field_positions.values() ] data_rest = [ " if kwargs:", " from mode.utils.text import pluralize", ' message = "{} got unexpected {}: {}".format(', " self.__class__.__name__,", ' pluralize(kwargs.__len__(), "argument"),', ' ", ".join(map(str, sorted(kwargs))))', " raise TypeError(message)", ] init_setters = ["else:"] if field_positions: init_setters.extend( generate_setter(field, generate_prepare_value(field)) for field in field_positions.values() ) init_setters.append(" self.__dict__.update(kwargs)") postamble = [] if has_post_init: postamble.append("self.__post_init__()") if needs_validation: postamble.append("self.validate_or_raise()") signature = list( chain( ["self"], [f"{field}" for field in required_fields], [f"{field}=None" for field in optional_fields], kwonlyargs, ) ) sourcecode = codegen.build_closure_source( name="__init__", args=signature, body=list( chain( preamble, data_setters, data_rest, init_setters, postamble, ) ), closures=closures, outer_args=["Model"], ) # TIP final sourcecode also available # as .__sourcecode__ on returned method # (print(Model.__init__.__sourcecode__) return codegen.build_closure( "__outer__", sourcecode, cls, globals={}, locals={}, ) @classmethod def _BUILD_hash(cls) -> Callable[[], None]: return codegen.HashMethod( list(cls._options.fields), globals=globals(), locals=locals() ) @classmethod def _BUILD_eq(cls) -> Callable[[], None]: return codegen.EqMethod( list(cls._options.fields), globals=globals(), locals=locals() ) @classmethod def _BUILD_ne(cls) -> Callable[[], None]: return codegen.NeMethod( list(cls._options.fields), globals=globals(), locals=locals() ) @classmethod def _BUILD_gt(cls) -> Callable[[], None]: return codegen.GtMethod( list(cls._options.fields), globals=globals(), locals=locals() ) @classmethod def _BUILD_ge(cls) -> Callable[[], None]: return codegen.GeMethod( list(cls._options.fields), globals=globals(), locals=locals() ) @classmethod def _BUILD_lt(cls) -> Callable[[], None]: return codegen.LtMethod( list(cls._options.fields), globals=globals(), locals=locals() ) @classmethod def _BUILD_le(cls) -> Callable[[], None]: return codegen.LeMethod( list(cls._options.fields), globals=globals(), locals=locals() ) @classmethod def _BUILD_asdict(cls) -> Callable[..., Dict[str, Any]]: preamble = [ "return self._prepare_dict({", ] fields = [ f" {d.output_name!r}: {cls._BUILD_asdict_field(name, d)}," for name, d in cls._options.descriptors.items() if not d.exclude ] postamble = [ "})", ] return codegen.Method( "_asdict", [], preamble + fields + postamble, globals=globals(), locals=locals(), ) def _prepare_dict(self, payload: Dict[str, Any]) -> Dict[str, Any]: return payload @classmethod def _BUILD_asdict_field(cls, name: str, field: FieldDescriptorT) -> str: return f"self.{name}" def _derive(self, *objects: ModelT, **fields: Any) -> ModelT: data = self.asdict() for obj in objects: data.update(cast(Record, obj).asdict()) return type(self)(**{**data, **fields})
[docs] def to_representation(self) -> Mapping[str, Any]: """Convert model to its Python generic counterpart. Records will be converted to dictionary. """ # Convert known fields to mapping of ``{field: value}``. payload = self.asdict() options = self._options if options.include_metadata: payload[self._blessed_key] = {"ns": options.namespace} return payload
[docs] def asdict(self) -> Dict[str, Any]: # pragma: no cover """Convert record to Python dictionary.""" ... # generated by _BUILD_asdict
# Used to disallow overriding this method asdict.faust_generated = True # type: ignore def _humanize(self) -> str: # we try to preserve the order of fields specified in the class, # so doing {**self._options.defaults, **self.__dict__} does not work. attrs, defaults = self.__dict__, self._options.defaults.items() fields = { **{k: v for k, v in attrs.items() if not k.startswith("__")}, **{k: v for k, v in defaults if k not in attrs}, } return _kvrepr(fields) def __json__(self) -> Any: return self.to_representation() def __eq__(self, other: Any) -> bool: # pragma: no cover # implemented by BUILD_eq return NotImplemented def __ne__(self, other: Any) -> bool: # pragma: no cover # implemented by BUILD_ne return NotImplemented def __lt__(self, other: "Record") -> bool: # pragma: no cover # implemented by BUILD_lt return NotImplemented def __le__(self, other: "Record") -> bool: # pragma: no cover # implemented by BUILD_le return NotImplemented def __gt__(self, other: "Record") -> bool: # pragma: no cover # implemented by BUILD_gt return NotImplemented def __ge__(self, other: "Record") -> bool: # pragma: no cover # implemented by BUILD_ge return NotImplemented
def _kvrepr(d: Mapping[str, Any], *, sep: str = ", ") -> str: """Represent dict as `k='v'` pairs separated by comma.""" return sep.join(f"{k}={v!r}" for k, v in d.items())