Source code for faust.models.base

"""Model descriptions.

The model describes the components of a data structure, kind of like a struct
in C, but there's no limitation of what type of data structure the model is,
or what it's used for.

A record (faust.models.record) is a model type that serialize into
dictionaries, so the model describe the fields, and their types:

.. sourcecode:: pycon

    >>> class Point(Record):
    ...    x: int
    ...    y: int

    >>> p = Point(10, 3)
    >>> assert p.x == 10
    >>> assert p.y == 3
    >>> p
    <Point: x=10, y=3>
    >>> payload = p.dumps(serializer='json')
    '{"x": 10, "y": 3, "__faust": {"ns": "__main__.Point"}}'
    >>> p2 = Record.loads(payload)
    >>> p2
    <Point: x=10, y=3>

Models are mainly used for describing the data in messages: both keys and
values can be described as models.
"""

import abc
import warnings
from datetime import datetime
from functools import partial
from typing import (
    Any,
    Callable,
    ClassVar,
    Iterable,
    List,
    MutableMapping,
    Optional,
    Tuple,
    Type,
)

from mode.utils.objects import canoname

from faust.exceptions import ValidationError
from faust.serializers.codecs import CodecArg, dumps, loads
from faust.types.models import (
    CoercionMapping,
    FieldDescriptorT,
    FieldMap,
    ModelOptions,
    ModelT,
)

__all__ = ["Model", "maybe_model", "registry"]

# NOTES:
# - Records are described in the same notation as named tuples in Python 3.6.
#   To accomplish this ``__init_subclass__`` defined in :pep:`487` is used.
#
#   When accessed on the Record class, the attributes are actually field
#   descriptors that return information about the field:
#       >>> Point.x
#       <FieldDescriptor: Point.x: int>
#
#   This field descriptor holds information about the name of the field, the
#   value type of the field, and also what Record subclass it belongs to.
#
# - Sometimes field descriptions are passed around as arguments to functions.
#
# - A stream of deposits may be joined with a stream of orders if
#   both have an ``account`` field.  Field descriptors are used to
#   specify the field.
#
# - order_instance.account is data
#   (it holds the string account for this particular order).
#
# - order_instance.__class__.account is the field descriptor for the field,
#   it's not data but metadata that enables introspection, and it can be
#   passed around to describe a field we want to extract or similar.
#
# - FieldDescriptors are Python descriptors: In Python object
#   attributes can override what happens when they are get/set/deleted:
#
#       class MyDescriptor:
#
#           def __get__(self, instance, cls):
#               if instance is None:
#                   print('ACCESS ON CLASS ATTRIBUTE')
#                   return self
#               print('ACCESS ON INSTANCE')
#               return 42
#
#       class Example:
#           foo = MyDescriptor()
#
#   The above descriptor overrides __get__, which is called when the attribute
#   is accessed (a descriptor may also override __set__ and __del__).
#
#
#   You can see the difference in what happens when you access the attribute
#   on the class, vs. the instance:

#       >>> Example.foo
#       ACCESS ON CLASS ATTRIBUTE
#       <__main__.MyDescriptor at 0x1049caac8>
#
#       >>> x = Example()
#       >>> x.foo
#       ACCESS ON INSTANCE
#       42

E_ABSTRACT_INSTANCE = """
Cannot instantiate abstract model.

If this model is used as the field of another model,
and you meant to define a polymorphic relationship: make sure
your abstract model class has the `polymorphic_fields` option enabled:

    class {name}(faust.Record, abstract=True, polymorphic_fields=True):
        ...
"""

#: Global map of namespace -> Model, used to find model classes by name.
#: Every single model defined is added here automatically when a model
#: class is defined.
registry: MutableMapping[str, Type[ModelT]] = {}


[docs]def maybe_model(arg: Any) -> Any: """Convert argument to model if possible.""" try: model = registry[arg["__faust"]["ns"]] except (KeyError, TypeError): return arg else: return model.from_data(arg)
[docs]class Model(ModelT): """Meta description model for serialization.""" #: Set to True if this is an abstract base class. __is_abstract__: ClassVar[bool] = True __validation_errors__ = None _pending_finalizers: ClassVar[Optional[List[Callable]]] = None #: Serialized data may contain a "blessed key" that mandates #: how the data should be deserialized. This probably only #: applies to records, but we need to support it at Model level. #: The blessed key has a dictionary value with a ``ns`` key: #: data = {.., '__faust': {'ns': 'examples.simple.Withdrawal'}} #: When ``Model._maybe_reconstruct` sees this key it will look #: up that namespace in the :data:`registry`, and if it exists #: select it as the target model to use for serialization. #: #: Is this similar to how unsafe deserialization in pickle/yaml/etc. #: works? No! pickle/pyyaml allow for arbitrary types to be #: deserialized (and worse in pickle's case), whereas the blessed #: key can only deserialize to a hardcoded list of types that are #: already under the remote control of messages anyway. #: For example it's not possible to perform remote code execution #: by providing a blessed key namespace of "os.system", simply #: because os.system is not in the registry of allowed types. _blessed_key = "__faust" @classmethod def _maybe_namespace( cls, data: Any, *, preferred_type: Type[ModelT] = None, fast_types: Tuple[Type, ...] = (bytes, str), isinstance: Callable = isinstance, ) -> Optional[Type[ModelT]]: # The serialized data may contain a ``__faust`` blessed key # holding the name of the model it should be deserialized as. # So even if value_type=MyModel, the data may mandata that it # should be deserialized using "foo.bar.baz" instead. # This is how we deal with Kafka's lack of message headers, # as needed by the RPC mechanism, without wrapping all data. if data is None or isinstance(data, fast_types): return None try: ns = data[cls._blessed_key]["ns"] except (KeyError, TypeError): pass else: # we only allow blessed keys when type=None, or type=Model type_is_abstract = ( preferred_type is None or preferred_type is ModelT or preferred_type is Model ) try: model = registry[ns] except KeyError: if type_is_abstract: raise return None else: if ( type_is_abstract or model._options.allow_blessed_key or model._options.polymorphic_fields ): return model return None @classmethod def _maybe_reconstruct(cls, data: Any) -> Any: model = cls._maybe_namespace(data) return model.from_data(data) if model else data @classmethod def _from_data_field(cls, data: Any) -> Optional["Model"]: if data is not None: if cls.__is_abstract__: return cls._maybe_reconstruct(data) return cls.from_data(data, preferred_type=cls) return None
[docs] @classmethod def loads( cls, s: bytes, *, default_serializer: CodecArg = None, # XXX use serializer serializer: CodecArg = None, ) -> ModelT: """Deserialize model object from bytes. Keyword Arguments: serializer (CodecArg): Default serializer to use if no custom serializer was set for this model subclass. """ if default_serializer is not None: warnings.warn( DeprecationWarning("default_serializer deprecated, use: serializer"), stacklevel=2, ) ser = cls._options.serializer or serializer or default_serializer data = loads(ser, s) return cls.from_data(data)
def __init_subclass__( self, serializer: Optional[str] = None, namespace: Optional[str] = None, include_metadata: Optional[bool] = None, isodates: Optional[bool] = None, abstract: bool = False, allow_blessed_key: Optional[bool] = None, decimals: Optional[bool] = None, coerce: Optional[bool] = None, coercions: CoercionMapping = None, polymorphic_fields: Optional[bool] = None, validation: Optional[bool] = None, date_parser: Callable[[Any], datetime] = None, lazy_creation: bool = False, **kwargs: Any, ) -> None: # Python 3.6 added the new __init_subclass__ function that # makes it possible to initialize subclasses without using # metaclasses (:pep:`487`). super().__init_subclass__(**kwargs) # mypy does not recognize `__init_subclass__` as a classmethod # and thinks we're mutating a ClassVar when setting: # cls.__is_abstract__ = False # To fix this we simply delegate to a _init_subclass classmethod. finalizer = partial( self._init_subclass, serializer, namespace, include_metadata, isodates, abstract, allow_blessed_key, decimals, coerce, coercions, polymorphic_fields, validation, date_parser, ) if lazy_creation: self._pending_finalizers = [finalizer] else: self._pending_finalizers = None finalizer()
[docs] @classmethod def make_final(cls) -> None: pending, cls._pending_finalizers = cls._pending_finalizers, None if pending: for finalizer in pending: finalizer()
@classmethod def _init_subclass( cls, serializer: Optional[str] = None, namespace: Optional[str] = None, include_metadata: Optional[bool] = None, isodates: Optional[bool] = None, abstract: bool = False, allow_blessed_key: Optional[bool] = None, decimals: Optional[bool] = None, coerce: Optional[bool] = None, coercions: CoercionMapping = None, polymorphic_fields: Optional[bool] = None, validation: Optional[bool] = None, date_parser: Callable[[Any], datetime] = None, ) -> None: # Can set serializer/namespace/etc. using: # class X(Record, serializer='json', namespace='com.vandelay.X'): # ... try: custom_options = cls.Options except AttributeError: custom_options = None else: delattr(cls, "Options") options = getattr(cls, "_options", None) if options is None: options = ModelOptions() options.coercions = {} options.defaults = {} else: options = options.clone_defaults() if custom_options: options.__dict__.update(custom_options.__dict__) if coerce is not None: options.coerce = coerce if coercions is not None: options.coercions.update(coercions) if serializer is not None: options.serializer = serializer if include_metadata is not None: options.include_metadata = include_metadata if isodates is not None: options.isodates = isodates if decimals is not None: options.decimals = decimals if allow_blessed_key is not None: options.allow_blessed_key = allow_blessed_key if polymorphic_fields is not None: options.polymorphic_fields = polymorphic_fields if validation is not None: options.validation = validation options.coerce = True # validation implies coerce if date_parser is not None: options.date_parser = date_parser options.namespace = namespace or canoname(cls) if abstract: # Custom base classes can set this to skip class initialization. cls.__is_abstract__ = True cls._options = options cls.__init__ = cls.__abstract_init__ # type: ignore return cls.__is_abstract__ = False # Add introspection capabilities cls._contribute_to_options(options) # Add FieldDescriptors for every field. options.descriptors = cls._contribute_field_descriptors(cls, options) # Store options on new subclass. cls._options = options cls._contribute_methods() # Register in the global registry, so we can look up # models by namespace. registry[options.namespace] = cls codegens = [ ("__init__", cls._BUILD_init, "_model_init"), ("__hash__", cls._BUILD_hash, "_model_hash"), ("__eq__", cls._BUILD_eq, "_model_eq"), ("__ne__", cls._BUILD_ne, "_model_ne"), ("__gt__", cls._BUILD_gt, "_model_gt"), ("__ge__", cls._BUILD_ge, "_model_ge"), ("__lt__", cls._BUILD_lt, "_model_lt"), ("__le__", cls._BUILD_le, "_model_le"), ] for meth_name, meth_gen, attr_name in codegens: # self._model_init = cls._BUILD_init() # if '__init__' not in cls.__dict__: # cls.__init__ = self._model_init meth = meth_gen() setattr(cls, attr_name, meth) if meth_name not in cls.__dict__: setattr(cls, meth_name, meth) def __abstract_init__(self) -> None: raise NotImplementedError( E_ABSTRACT_INSTANCE.format( name=type(self).__name__, ) ) @classmethod @abc.abstractmethod def _contribute_to_options(cls, options: ModelOptions) -> None: # pragma: no cover ... @classmethod def _contribute_methods(cls) -> None: # pragma: no cover ... @classmethod @abc.abstractmethod def _contribute_field_descriptors( cls, target: Type, options: ModelOptions, parent: Optional[FieldDescriptorT] = None, ) -> FieldMap: # pragma: no cover ... @classmethod @abc.abstractmethod def _BUILD_init(cls) -> Callable[[], None]: # pragma: no cover ... @classmethod @abc.abstractmethod def _BUILD_hash(cls) -> Callable[[], None]: # pragma: no cover ... @classmethod @abc.abstractmethod def _BUILD_eq(cls) -> Callable[[], None]: # pragma: no cover ...
[docs] @abc.abstractmethod def to_representation(self) -> Any: # pragma: no cover """Convert object to JSON serializable object."""
@abc.abstractmethod def _humanize(self) -> str: # pragma: no cover """Return string representation of object for debugging purposes.""" ...
[docs] def is_valid(self) -> bool: return True if not self.validate() else False
[docs] def validate(self) -> List[ValidationError]: errors = self.__validation_errors__ if errors is None: errors = self.__validation_errors__ = list(self._itervalidate()) return errors
[docs] def validate_or_raise(self) -> None: errors = self.validate() if errors: raise errors[0]
def _itervalidate(self) -> Iterable[ValidationError]: for name, descr in self._options.descriptors.items(): yield from descr.validate_all(getattr(self, name)) @property def validation_errors(self) -> List[ValidationError]: return self.validate()
[docs] def derive(self, *objects: ModelT, **fields: Any) -> ModelT: """Derive new model with certain fields changed.""" return self._derive(*objects, **fields)
@abc.abstractmethod # pragma: no cover def _derive(self, *objects: ModelT, **fields: Any) -> ModelT: raise NotImplementedError()
[docs] def dumps(self, *, serializer: CodecArg = None) -> bytes: """Serialize object to the target serialization format.""" return dumps(serializer or self._options.serializer, self.to_representation())
def __repr__(self) -> str: return f"<{type(self).__name__}: {self._humanize()}>"