Models, Serialization, and Codecs

Basics

Models describe the fields of data structures used as keys and values in messages. They’re defined using a NamedTuple-like syntax:

class Point(Record, serializer='json'):
    x: int
    y: int

Here we define a “Point” record having x, and y fields of type int.

A record is a model of the dictionary type, having keys and values of a certain type.

When using JSON as the serialization format, the Point model serializes to:

>>> Point(x=10, y=100).dumps()
{"x": 10, "y": 100}

To temporarily use a different serializer, provide that as an argument to .dumps:

>>> Point(x=10, y=100).dumps(serializer='pickle')  # pickle + Base64
b'gAN9cQAoWAEAAAB4cQFLClgBAAAAeXECS2RYBwAAAF9fZmF1c3RxA31xBFg
CAAAAbnNxBVgOAAAAX19tYWluX18uUG9pbnRxBnN1Lg=='

“Record” is the only type supported, but in the future we also want to have arrays and other data structures.

In use

Models are useful when data needs to be serialized/deserialized, or whenever you just want a quick way to define data.

In Faust we use models to:

  • Describe the data used in streams (topic keys and values).

  • HTTP requests (POST data).

For example here’s a topic where both keys and values are points:

my_topic = faust.topic('mytopic', key_type=Point, value_type=Point)

@app.agent(my_topic)
async def task(events):
    async for event in events:
        print(event)

Warning

Changing the type of a topic is backward incompatible change. You need to restart all Faust instances using the old key/value types.

The best practice is to provide an upgrade path for old instances.

The topic already knows what type is required, so when sending data you just provide the values as-is:

await my_topic.send(key=Point(x=10, y=20), value=Point(x=30, y=10))

Anonymous Agents

An “anonymous” agent does not use a topic description.

Instead the agent will automatically create and manage its own topic under the hood.

To define the key and value type of such an agent just pass them as keyword arguments:

@app.agent(key_type=Point, value_type=Point)
async def my_agent(events):
    async for event in events:
        print(event)

Now instead of having a topic where we can send messages, we can use the agent directly:

await my_agent.send(key=Point(x=10, y=20), value=Point(x=30, y=10))

Schemas

A “schema” configures both key and value type for a topic, and also the serializers used.

Schemas are also able to read the headers of Kafka messages, and so can be used for more complex serialization support, such as Protocol Buffers or Apache Thrift.

To define a topic using a schema:

schema = faust.Schema(
    key_type=Point,
    value_type=Point,
    key_serializer='json',
    value_serializer='json',
)

topic = app.topic('mytopic', schema=schema)

If any of the serializer arguments are omitted, the default from the app configuration will be used.

Schemas can also be used with “anonymous agents” (see above)

@app.agent(schema=schema)
async def myagent(stream):
    async for value in stream:
        print(value)

Schemas are most useful when extending Faust, for example defining a schema that reads message key and value type from Kafka headers:

import faust
from faust.types import ModelT
from faust.types.core import merge_headers
from faust.models import registry

class Autodetect(faust.Schema):

    def loads_key(self, app, message, *,
                  loads=None,
                  serializer=None):
        if loads is None:
            loads = app.serializers.loads_key
        # try to get key_type and serializer from Kafka headers
        headers = dict(message.headers)
        key_type_name = headers.get('KeyType')
        serializer = serializer or headers.get('KeySerializer')
        if key_type_name:
            key_type = registry[key_type]
            return loads(key_type, message.key,
                   serializer=serializer)
        else:
            return super().loads_key(
                app, message, loads=loads, serializer=serializer)

    def loads_value(self, app, message, *,
                    loads=None,
                    serializer=None):
        if loads is None:
            loads = app.serializers.loads_value
        # try to get value_type and serializer from Kafka headers
        headers = dict(message.headers)
        value_type_name = headers.get('ValueType')
        serializer = serializer or headers.get('ValueSerializer')
        if value_type_name:
            value_type = registry[value_type_name]
            return loads(value_type, message.value,
                   serializer=serializer)
        else:
            return super().loads_value(
                app, message, loads=loads, serializer=serializer)

    def on_dumps_key_prepare_headers(self, key, headers):
        # If key is a model, set the KeyType header to the models
        # registered name.

        if isinstance(key, ModelT):
            key_type_name = key._options.namespace
            return merge_headers(headers, {'KeyType': key_type_name})
        return headers

    def on_dumps_value_prepare_headers(self, value, headers):
        if isinstance(value, ModelT):
            value_type_name = value._options.namespace
            return merge_headers(headers, {'ValueType': value_type_name})
        return headers

    app = faust.App('id')
    my_topic = app.topic('mytopic', schema=Autodetect())

Manual Serialization

Models are not required to read data from a stream.

To deserialize streams manually, use a topic with bytes values:

topic = app.topic('custom', value_type=bytes)

@app.agent
async def processor(stream):
    async for payload in stream:
        data = json.loads(payload)

To integrate with external systems, Codecs help you support serialization and de-serialization to and from any format. Models describe the form of messages, and codecs explain how they’re serialized, compressed, encoded, and so on.

The default codec is configured by the applications key_serializer and value_serializer arguments:

app = faust.App(key_serializer='json')

Individual models can override the default by specifying a serializer argument when creating the model class:

class MyRecord(Record, serializer='json'):
    ...

Codecs may also be combined to provide multiple encoding and decoding stages, for example serializer='json|binary' will serialize as JSON then use the Base64 encoding to prepare the payload for transmission over textual transports.

See also

  • The Codecs section – for more information about codecs and how to define your own.

Model Types

Records

A record is a model based on a dictionary/mapping.

Here’s a simple record describing a 2d point, having two required fields:

class Point(faust.Record):
    x: int
    y: int

To create a new point, provide the fields as keyword arguments:

>>> point = Point(x=10, y=20)
>>> point
<Point: x=10, y=20>

If you forget to pass a required field, we throw an error:

>>> point = Point(x=10)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/faust/faust/models/record.py", line 96, in __init__
    self._init_fields(fields)
File "/opt/devel/faust/faust/models/record.py", line 106, in _init_fields
    type(self).__name__, ', '.join(sorted(missing))))
TypeError: Point missing required arguments: y

If you don’t want it to be an error, make it an optional field:

class Point(faust.Record, serializer='json'):
    x: int
    y: int = 0

You may now omit the y field when creating points:

>>> point = Point(x=10)
<Point: x=10 y=0>

Note

The order is important here: all optional fields must be defined after all requred fields.

This is not allowed:

class Point(faust.Record, serializer='json'):
    x: int
    y: int = 0
    z: int

but this works:

class Point(faust.Record, serializer='json')
    x: int
    z: int
    y: int = 0

Fields

Records may have fields of arbitrary types and both standard Python types and user defined classes will work.

Note that field types must support serialization, otherwise we cannot reconstruct the object back to original form.

Fields may refer to other models:

class Account(faust.Record, serializer='json'):
    id: str
    balance: float

class Transfer(faust.Record, serializer='json'):
    account: Account
    amount: float

transfer = Transfer(
    account=Account(id='RBH1235678', balance=13000.0),
    amount=1000.0,
)

The field type is a type annotation, so you can use the https://pypi.org/project/mypy/ type checker to verify arguments passed have the correct type.

We do not perform any type checking at runtime.

Collections

Fields can be collections of another type.

For example a User model may have a list of accounts:

from typing import List
import faust


class User(faust.Record):
    accounts: List[Account]

Not only lists are supported, you can also use dictionaries, sets and others.

Consult this table of supported annotations:

Collection

Recognized Annotations

List

Set

Tuple

  • Tuple[ModelT, ...]

  • Tuple[ModelT, ModelT, str]

Mapping

From this table we can tell that we may have a mapping of username to account:

from typing import Mapping
import faust


class User(faust.Record):
    accounts: Mapping[str, Account]

Faust will then automatically reconstruct the User.accounts field into a mapping of account-ids to Account objects.

Coercion

By default we do not force types, this is for backward compatibility with older Faust application.

This means that a field of type str will happily accept None as value, and any other type.

If you want strict types enable the coerce option:

class X(faust.Record, coerce=True):
    foo: str
    bar: Optional[str]

Here, the foo field will be required to be a string, while the bar field can have None values.

Tip

Having validation=True implies coerce=True but will additionally enable field validation.

See Validation for more information.

Coercion also enables automatic conversion to and from datetime and Decimal.

You may also disable coercion for the class, but enable it for individual fields by writing explicit field descriptors:

import faust
from faust.models.fields import DatetimeField, StringField

class Account(faust.Record):
    user_id: str = StringField(coerce=True)
    date_joined: datetime = DatetimeField(coerce=False)
    login_dates: List[datetime] = DatetimeField(coerce=True)

datetime

When using JSON we automatically convert datetime fields into ISO-8601 text format, and automatically convert back into into datetime when deserializing.

from datetime import datetime
import faust

class Account(faust.Record, coerce=True, serializer='json'):
    date_joined: datetime
Other date formats

The default date parser supports ISO-8601 only. To support this format and many other formats (such as 'Sat Jan 12 00:44:36 +0000 2019') you can select to use https://pypi.org/project/python-dateutil/ as the parser.

To change the date parsing function for a model globally:

from dateutil.parser import parse as parse_date

class Account(faust.Record, coerce=True, date_parser=parse_date):
    date_joined: datetime

To change the date parsing function for a specific field:

from dateutil.parser import parse as parse_date
from faust.models.fields import DatetimeField

class Account(faust.Record, coerce=True):
    # date_joined: supports ISO-8601 only (default)
    date_joined: datetime

    #: date_last_login: comes from weird system with more human
    #: readable dates ('Sat Jan 12 00:44:36 +0000 2019').
    #: The dateutil parser can handle many different date and time
    #: formats.
    date_last_login: datetime = DatetimeField(date_parser=parse_date)

Decimal

JSON doesn’t have a high precision decimal field type so if you require high precision you must use Decimal.

The built-in JSON encoder will convert these to strings in the json payload, that way we do not lose any precision.

from decimal import Decimal
import faust

class Order(faust.Record, coerce=True, serializer='json'):
    price: Decimal
    quantity: Decimal

Abstract Models

To create a model base class with common functionality, mark the model class with abstract=True.

Abstract models must be inherited from, and cannot be instantiated directly.

Here’s an example base class with default fields for creation time and last modified time:

class MyBaseRecord(Record, abstract=True):
    time_created: Optional[float] = None
    time_modified: Optional[float] = None

Inherit from this model to create a new model having the fields by default:

class Account(MyBaseRecord):
    id: str

account = Account(id='X', time_created=3124312.3442)
print(account.time_created)

Positional Arguments

The best practice when creating model instances is to use keyword arguments, but positional arguments are also supported!

The point Point(x=10, y=30) may also be expressed as Point(10, 30).

Back to why this is not a good practice, consider the case of inheritance:

import faust

class Point(faust.Record):
    x: int
    y: int

class XYZPoint(Point):
    z: int

point = XYZPoint(10, 20, 30)
assert (point.x, point.y, point.z) == (10, 20, 30)

To deduce the order arguments we now have to consider the inheritance tree, this is difficult without looking up the source code.

This quickly turns even more complicated when we add multiple inheritance into the mix:

class Point(AModel, BModel):
    ...

We suggest using positional arguments only for simple classes such as the Point example, where inheritance of additional fields is not used.

Fields with the same name as a reserved keyword

Sometimes the data you want to describe will contain field names that collide with a reserved Python keyword.

One such example is a field named in. You cannot define a model like this:

class OpenAPIParameter(Record):
    in: str = 'query'

doing so will result in a NameError exception being raised.

To properly support this, you need to rename the field but specify an alternative input_name:

from faust.models.fields import StringField

class OpenAPIParameter(Record):
    location: str = StringField(default='query', input_name='in')

The input_name here describes the name of the field in serialized payloads. There’s also a corresponding output_name that can be used to specify what field name this field deserializes to. The default output name is the same as the input name.

Polymorphic Fields

Felds can refer to other models, such as an account with a user field:

class User(faust.Record):
    id: str
    first_name: str
    last_name: str

class Account(faust.Record):
    user: User
    balance: Decimal

This is a strict relationship: the value for Account.user can only be an instance of the User type.

Polymorphic fields are also supported, where the type of the field is decided at runtime.

Consider an Article models with a list of assets where the type of asset is decided at runtime:

class Asset(faust.Record):
    url: str
    type: str

class ImageAsset(Asset):
    type = 'image'

class VideoAsset(Asset):
    runtime_seconds: float
    type = 'video'

class Article(faust.Record, polymorphic_fields=True):
    assets: List[Asset]

How does this work? Faust models add additional metadata when serialized, just look at the payload for one of our accounts:

>>> user = User(
...    id='07ecaebf-48c4-4c9e-92ad-d16d2f4a9a19',
...    first_name='Franz',
...    last_name='Kafka',
... )
>>> account = Account(
...    user=user,
...    balance='12.3',
)
>>> from pprint import pprint
>>> pprint(account.to_representation())
{
    '__faust': {'ns': 't.Account'},
    'balance': Decimal('12.3'),
    'user': {
        '__faust': {'ns': 't.User'},
        'first_name': 'Franz',
        'id': '07ecaebf-48c4-4c9e-92ad-d16d2f4a9a19',
        'last_name': 'Kafka',
    },
}

Here the metadata section is the __faust field, and it contains the name of the model that generated this payload.

By default we don’t use this name for anything at all, but we do if polymorphic fields are enabled.

Why is it disabled by default? There is often a mismatch between the name of the class used to produce the event, and the class we want to reconstruct it as.

Imagine a producer is using an outdated version, or model cannot be shared between systems (this happens when using different programming languages, integrating with proprietary systems, and so on.)

The namespace ns contains the fully qualified name of the model class (in this example t.User).

Faust will keep an index of model names, and whenever you define a new model class we add it to this index.

Note

If you’re trying to deserialize a model but it complains that it does not exist, you probably forgot to import this model before using it.

For the same reason you should not be renaming classes without having a strategy to do so in a forward compatible manner.

Validation

For models there is no validation of data by default: if you have a field described as an int, it will happily accept a string or any other object that you pass to it:

>>> class Person(faust.Record):
...    age: int
...

>>> p = Person(age="foo")
>>> p.age
"foo"

However, there is an option that will enable validation for all common JSON fields (int, float, str, etc.), and some commonly used Python ones (datetime, Decimal, etc.)

>>> class Person(faust.Record, validation=True):
...     age: int

>>> p = Person(age="foo")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  ValidationError: Invalid type for int field 'age': 'foo' (str)

For things like web forms raising an error automatically is not a good solution, as the client will usually want a list of all errors.

So in web views we suggest disabling automatic validation, and instead manually validating the model by calling model.validate(). to get a list of ValidationError instances.

>>> class Person(faust.Record):
...     age: int
...     name: str

>>> p = Person(age="Gordon Gekko", name="32")
>>> p.validate()
[
  ('age': ValidationError(
        "Invalid type for int field 'age': 'Gordon Gekko' (str)"),
  ('name': ValidationError(
        "Invalid type for str field 'name': 32 (int)")),
]

Advanced Validation

If you have a field you want validation for, you may explicitly define the field descriptor for the field you want validation on (note: this will override the built-in validation for that field). This will also enable you to access more validation options, such as the maximum number of characters for a string, or a minmum value for an integer:

class Person(faust.Record, validation=True):
    age: int = IntegerField(min_value=18, max_value=99)
    name: str

Custom field types

You may define a custom FieldDescriptor subclass to perform your own validation:

from typing import Any, Iterable, List
from faust.exceptions import ValidationError
from faust.models import FieldDescriptor

class ChoiceField(FieldDescriptor[str]):

    def __init__(self, choices: List[str], **kwargs: Any) -> None:
        self.choices = choices
        # Must pass any custom args to init,
        # so we pass the choices keyword argument also here.
        super().__init__(choices=choices, **kwargs)

    def validate(self, value: str) -> Iterable[ValidationError]:
        if value not in self.choices:
            choices = ', '.join(self.choices)
            yield self.validation_error(
                f'{self.field} must be one of {choices}')

After defining the subclass you may use it to define model fields:

>>> class Order(faust.Record):
...    side: str = ChoiceField(['SELL', 'BUY'])

>>> Order(side='LEFT')
faust.exceptions.ValidationError: (
    'side must be one of SELL, BUY', <ChoiceField: Order.side: str>)

Excluding fields from representation

If you want your model to accept a certain field when deserializing, but exclude the same field from serialization, you can do so by marking that field as exclude=True:

import faust
from faust.models.fields import StringField


class Order(faust.Record):
    price: float
    quantity: float
    user_id: str = StringField(required=True, exclude=True)

This model will accept user_id as a keyword argument, and from any serialized structure:

>>> order = Order(price=30.0, quantity=2.0, user_id='foo')
>>> order.user_id
'foo'

>>> order2 = Order.loads(
...     '{"price": "30.0", quantity="2.0", "user_id": "foo"}',
...     serializer='json',
... )

>>> order2.user_id
'foo'

But when serializing the order, the field will be excluded:

>>> order.asdict()
{'price': 30.0, 'quantity': 2.0}

>>> order.dumps(serializer='json')
'{"price": "30.0", "quantity": "2.0"}'

Reference

Serialization/Deserialization
class faust.Record[source]
classmethod loads(s: bytes, *, default_serializer: Optional[Union[CodecT, str]] = None, serializer: Optional[Union[CodecT, str]] = None) ModelT

Deserialize model object from bytes.

Keyword Arguments:

serializer (CodecArg) – Default serializer to use if no custom serializer was set for this model subclass.

Return type:

ModelT

dumps(*, serializer: Optional[Union[CodecT, str]] = None) bytes

Serialize object to the target serialization format.

Return type:

bytes

to_representation() Mapping[str, Any][source]

Convert model to its Python generic counterpart.

Records will be converted to dictionary.

Return type:

_GenericAlias[str, Any]

classmethod from_data(data: Mapping, *, preferred_type: Optional[Type[ModelT]] = None) Record[source]

Create model object from Python dictionary.

Return type:

Record

derive(*objects: ModelT, **fields: Any) ModelT

Derive new model with certain fields changed.

Return type:

ModelT

_options

Model metadata for introspection. An instance of faust.types.models.ModelOptions.

class faust.ModelOptions[source]
fields: Mapping[str, Type] = None

Flattened view of __annotations__ in MRO order.

Type:

Index

fieldset: FrozenSet[str] = None

Set of required field names, for fast argument checking.

Type:

Index

fieldpos: Mapping[int, str] = None

Positional argument index to field name. Used by Record.__init__ to map positional arguments to fields.

Type:

Index

optionalset: FrozenSet[str] = None

Set of optional field names, for fast argument checking.

Type:

Index

coercions: MutableMapping[Union[Type, Tuple[Type, ...]], Callable[[Any], Any]] = None
defaults: Mapping[str, Any] = None

Mapping of field names to default value.

Codecs

Supported codecs

  • raw - no encoding/serialization (bytes only).

  • json - json with UTF-8 encoding.

  • pickle - pickle with Base64 encoding (not URL-safe).

  • binary - Base64 encoding (not URL-safe).

Encodings are not URL-safe if the encoded payload cannot be embedded directly into a URL query parameter.

Serialization by name

The dumps() function takes a codec name and the object to encode as arguments, and returns bytes

>>> s = dumps('json', obj)

In reverse direction, the loads() function takes a codec name and an encoded payload to decode (in bytes), as arguments, and returns a reconstruction of the serialized object:

>>> obj = loads('json', s)

When passing in the codec type as a string (as in loads('json', ...) above), you can also combine multiple codecs to form a pipeline, for example "json|gzip" combines JSON serialization with gzip compression:

>>> obj = loads('json|gzip', s)

Codec registry

All codecs have a name and the faust.serializers.codecs attribute maintains a mapping from name to Codec instance.

You can add a new codec to this mapping by executing:

>>> from faust.serializers import codecs
>>> codecs.register(custom, custom_serializer())

To create a new codec, you need to define only two methods: first you need the _loads() method to deserialize bytes, then you need the _dumps() method to serialize an object:

import msgpack

from faust.serializers import codecs

class raw_msgpack(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
        return msgpack.dumps(obj)

    def _loads(self, s: bytes) -> Any:
        return msgpack.loads(s)

We use msgpack.dumps to serialize, and our codec now encodes to raw msgpack format.

You may also combine the Base64 codec to support transports unable to handle binary data (such as HTTP or Redis):

Combining codecs is done using the | operator:

def msgpack() -> codecs.Codec:
    return raw_msgpack() | codecs.binary()

codecs.register('msgpack', msgpack())
>>> import my_msgpack_codec

>>> from faust import Record
>>> class Point(Record, serializer='msgpack'):
...     x: int
...     y: int

At this point we have to import the codec every time we want to use it, that is very cumbersome.

Faust also supports registering codec extensions using https://pypi.org/project/setuptools/ entry-points, so instead lets create an installable msgpack extension!

Define a package with the following directory layout:

faust-msgpack/
    setup.py
    faust_msgpack.py

The first file (faust-msgpack/setup.py) defines metadata about our package and should look like:

import setuptools

setuptools.setup(
    name='faust-msgpack',
    version='1.0.0',
    description='Faust msgpack serialization support',
    author='Ola A. Normann',
    author_email='ola@normann.no',
    url='http://github.com/example/faust-msgpack',
    platforms=['any'],
    license='BSD',
    packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']),
    zip_safe=False,
    install_requires=['msgpack-python'],
    tests_require=[],
    entry_points={
        'faust.codecs': [
            'msgpack = faust_msgpack:msgpack',
        ],
    },
)

The most important part here is the entry_points section that tells setuptools how to load our plugin.

We have set the name of our codec to msgpack and the path to the codec class to be faust_msgpack:msgpack.

Faust imports this as it would do from faust_msgpack import msgpack, so we need to define hat part next in our faust-msgpack/faust_msgpack.py module:

from faust.serializers import codecs

class raw_msgpack(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
        return msgpack.dumps(s)


def msgpack() -> codecs.Codec:
    return raw_msgpack() | codecs.binary()

That’s it! To install and use our new extension do:

$ python setup.py install

At this point you can publish this to PyPI so it can be shared with other Faust users.