faust.serializers.codecs

Serialization utilities.

Supported codecs

  • raw - No encoding/serialization (bytes only).

  • json - json with UTF-8 encoding.

  • yaml - YAML (safe version)

  • pickle - pickle with base64 encoding (not urlsafe).

  • binary - base64 encoding (not urlsafe).

Serialization by name

The dumps() function takes a codec name and the object to encode, then returns bytes:

>>> s = dumps('json', obj)

For the reverse direction, the loads() function takes a codec name and bytes to decode:

>>> obj = loads('json', s)

You can also combine encoders in the name, like in this case where json is combined with gzip compression:

>>> obj = loads('json|gzip', s)

Codec registry

Codecs are configured by name and this module maintains a mapping from name to Codec instance: the codecs attribute.

You can add a new codec to this mapping by:

>>> from faust.serializers import codecs
>>> codecs.register(custom, custom_serializer())

A codec subclass requires two methods to be implemented: _loads() and _dumps():

import msgpack

from faust.serializers import codecs

class raw_msgpack(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
        return msgpack.dumps(obj)

    def _loads(self, s: bytes) -> Any:
        return msgpack.loads(s)

Our codec now encodes/decodes to raw msgpack format, but we may also need to transfer this payload over a transport easily confused by binary data, such as JSON where everything is Unicode.

You can chain codecs together, so to add a binary text encoding like Base64, to your codec, we use the | operator to form a combined codec:

def msgpack() -> codecs.Codec:
    return raw_msgpack() | codecs.binary()

codecs.register('msgpack', msgpack())

At this point we monkey-patched Faust to support our codec, and we can use it to define records like this:

>>> from faust.serializers import Record
>>> class Point(Record, serializer='msgpack'):
...     x: int
...     y: int

The problem with monkey-patching is that we must make sure the patching happens before we use the feature.

Faust also supports registering codec extensions using setuptools entry points, so instead we can create an installable msgpack extension.

To do so we need to define a package with the following directory layout:

faust-msgpack/
    setup.py
    faust_msgpack.py

The first file, faust-msgpack/setup.py, defines metadata about our package and should look like the following example:

from setuptools import setup, find_packages

setup(
    name='faust-msgpack',
    version='1.0.0',
    description='Faust msgpack serialization support',
    author='Ola A. Normann',
    author_email='ola@normann.no',
    url='http://github.com/example/faust-msgpack',
    platforms=['any'],
    license='BSD',
    packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']),
    zip_safe=False,
    install_requires=['msgpack-python'],
    tests_require=[],
    entry_points={
        'faust.codecs': [
            'msgpack = faust_msgpack:msgpack',
        ],
    },
)

The most important part being the entry_points key which tells Faust how to load our plugin. We have set the name of our codec to msgpack and the path to the codec class to be faust_msgpack:msgpack. This will be imported by Faust as from faust_msgpack import msgpack, so we need to define that part next in our faust-msgpack/faust_msgpack.py module:

from faust.serializers import codecs

class raw_msgpack(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
        return msgpack.dumps(s)


def msgpack() -> codecs.Codec:
    return raw_msgpack() | codecs.binary()

That’s it! To install and use our new extension we do:

$ python setup.py install

At this point may want to publish this on PyPI to share the extension with other Faust users.

class faust.serializers.codecs.Codec(children: Optional[Tuple[CodecT, ...]] = None, **kwargs: Any)[source]

Base class for codecs.

children: Tuple[CodecT, ...]

next steps in the recursive codec chain. x = pickle | binary returns codec with children set to (pickle, binary).

nodes: Tuple[CodecT, ...]

cached version of children including this codec as the first node. could use chain below, but seems premature so just copying the list.

kwargs: Dict

subclasses can support keyword arguments, the base implementation of clone() uses this to preserve keyword arguments in copies.

dumps(obj: Any) bytes[source]

Encode object obj.

Return type:

bytes

loads(s: bytes) Any[source]

Decode object from string.

Return type:

Any

clone(*children: CodecT) CodecT[source]

Create a clone of this codec, with optional children added.

Return type:

CodecT

faust.serializers.codecs.register(name: str, codec: CodecT) None[source]

Register new codec in the codec registry.

Return type:

None

faust.serializers.codecs.get_codec(name_or_codec: Optional[Union[CodecT, str]]) CodecT[source]

Get codec by name.

Return type:

CodecT

faust.serializers.codecs.dumps(codec: Optional[Union[CodecT, str]], obj: Any) bytes[source]

Encode object into bytes.

Return type:

bytes

faust.serializers.codecs.loads(codec: Optional[Union[CodecT, str]], s: bytes) Any[source]

Decode object from bytes.

Return type:

Any