Channels & Topics - Data Sources¶
Basics¶
Faust agents iterate over streams, and streams iterate over channels.
A channel is a construct used to send and receive messages, then we have the “topic”, which is a named-channel backed by a Kafka topic.
Faust defines these layers of abstraction so that agents can send and receive messages using more than one type of transport.
Topics are highly Kafka specific, while channels are not. That makes channels more natural to subclass should you require a different means of communication, for example using RabbitMQ (AMQP), Stomp, MQTT, NSQ, ZeroMQ, etc.
Channels¶
A channel is a buffer/queue used to send and receive messages. This buffer could exist in-memory in the local process only, or transmit serialized messages over the network.
You can create channels manually and read/write from them:
async def main():
channel = app.channel()
await channel.put(1)
async for event in channel:
print(event.value)
# the channel is infinite so we break after first event
break
Reference¶
Sending messages to channel¶
- class faust.Channel[source]
- async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata] [source]
Send message to channel.
- Return type:
_GenericAlias
[RecordMetadata
]
- as_future_message(key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, eager_partitioning: bool = False) FutureMessage [source]
Create promise that message will be transmitted.
- Return type:
- async publish_message(fut: FutureMessage, wait: bool = True) Awaitable[RecordMetadata] [source]
Publish message to channel.
This is the interface used by
topic.send()
, etc. to actually publish the message on the channel after being buffered up or similar.It takes a
FutureMessage
object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.- Return type:
_GenericAlias
[RecordMetadata
]
Declaring¶
Note
Some channels may require you to declare them on the server side before they’re used. Faust will create topics considered internal but will not create or modify “source topics” (i.e., exposed for use by other Kafka applications).
To define a topic as internal use
app.topic('name', ..., internal=True)
.
Topics¶
A topic is a named channel, backed by a Kafka topic. The name is used as the address of the channel, to share it between multiple processes and each process will receive a partition of the topic.