Channels & Topics - Data Sources

Basics

Faust agents iterate over streams, and streams iterate over channels.

A channel is a construct used to send and receive messages, then we have the “topic”, which is a named-channel backed by a Kafka topic.

Faust defines these layers of abstraction so that agents can send and receive messages using more than one type of transport.

Topics are highly Kafka specific, while channels are not. That makes channels more natural to subclass should you require a different means of communication, for example using RabbitMQ (AMQP), Stomp, MQTT, NSQ, ZeroMQ, etc.

Channels

A channel is a buffer/queue used to send and receive messages. This buffer could exist in-memory in the local process only, or transmit serialized messages over the network.

You can create channels manually and read/write from them:

async def main():
    channel = app.channel()

    await channel.put(1)

    async for event in channel:
        print(event.value)
        # the channel is infinite so we break after first event
        break

Reference

Sending messages to channel

class faust.Channel[source]
async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send message to channel.

Return type:

_GenericAlias[RecordMetadata]

as_future_message(key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, eager_partitioning: bool = False) FutureMessage[source]

Create promise that message will be transmitted.

Return type:

FutureMessage

async publish_message(fut: FutureMessage, wait: bool = True) Awaitable[RecordMetadata][source]

Publish message to channel.

This is the interface used by topic.send(), etc. to actually publish the message on the channel after being buffered up or similar.

It takes a FutureMessage object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.

Return type:

_GenericAlias[RecordMetadata]

Declaring

Note

Some channels may require you to declare them on the server side before they’re used. Faust will create topics considered internal but will not create or modify “source topics” (i.e., exposed for use by other Kafka applications).

To define a topic as internal use app.topic('name', ..., internal=True).

class faust.Channel[source]
maybe_declare() None[source]

Declare/create this channel, but only if it doesn’t exist.

Return type:

None

async declare() None[source]

Declare/create this channel.

This is used to create this channel on a server, if that is required to operate it.

Return type:

None

Topics

A topic is a named channel, backed by a Kafka topic. The name is used as the address of the channel, to share it between multiple processes and each process will receive a partition of the topic.