faust.channels

Channel.

A channel is used to send values to streams.

The stream will iterate over incoming events in the channel.

class faust.channels.Channel(app: AppT, *, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, is_iterator: bool = False, queue: Optional[ThrowableQueue] = None, maxsize: Optional[int] = None, root: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, loop: Optional[AbstractEventLoop] = None)[source]

Create new channel.

Parameters:
  • app (AppT) – The app that created this channel (app.channel())

  • schema (_UnionGenericAlias[SchemaT, None]) – Schema used for serialization/deserialization

  • key_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – The Model used for keys in this channel. (overrides schema if one is defined)

  • value_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – The Model used for values in this channel. (overrides schema if one is defined)

  • maxsize (_UnionGenericAlias[int, None]) – The maximum number of messages this channel can hold. If exceeded any new put call will block until a message is removed from the channel.

  • is_iterator (bool) – When streams iterate over a channel they will call stream.clone(is_iterator=True) so this attribute denotes that this channel instance is currently being iterated over.

  • active_partitions (_UnionGenericAlias[_GenericAlias[TP], None]) – Set of active topic partitions this channel instance is assigned to.

  • loop (_UnionGenericAlias[AbstractEventLoop, None]) – The asyncio event loop to use.

app: AppT
loop: Optional[AbstractEventLoop]
is_iterator: bool
maxsize: Optional[int]
active_partitions: Optional[Set[TP]]
schema: SchemaT
key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]]
value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]]
property queue: ThrowableQueue

Return the underlying queue/buffer backing this channel. :rtype: ThrowableQueue

clone(*, is_iterator: Optional[bool] = None, **kwargs: Any) ChannelT[T][source]

Create clone of this channel.

Parameters:

is_iterator (_UnionGenericAlias[bool, None]) – Set to True if this is now a channel that is being iterated over.

Keyword Arguments:

**kwargs – Any keyword arguments passed will override any of the arguments supported by Channel.__init__.

Return type:

ChannelT

clone_using_queue(queue: Queue) ChannelT[T][source]

Create clone of this channel using specific queue instance.

Return type:

ChannelT

stream(**kwargs: Any) StreamT[T][source]

Create stream reading from this channel.

Return type:

StreamT

get_topic_name() str[source]

Get the topic name, or raise if this is not a named channel.

Return type:

str

async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send message to channel.

Return type:

_GenericAlias[RecordMetadata]

send_soon(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False, eager_partitioning: bool = False) FutureMessage[source]

Produce message by adding to buffer.

This method is only supported by Topic.

Raises:

NotImplementedError – always for in-memory channel.

Return type:

FutureMessage

as_future_message(key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, eager_partitioning: bool = False) FutureMessage[source]

Create promise that message will be transmitted.

Return type:

FutureMessage

prepare_headers(headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]]) Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]][source]

Prepare headers passed before publishing.

Return type:

_UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]

async publish_message(fut: FutureMessage, wait: bool = True) Awaitable[RecordMetadata][source]

Publish message to channel.

This is the interface used by topic.send(), etc. to actually publish the message on the channel after being buffered up or similar.

It takes a FutureMessage object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.

Return type:

_GenericAlias[RecordMetadata]

maybe_declare() None[source]

Declare/create this channel, but only if it doesn’t exist.

Return type:

None

async declare() None[source]

Declare/create this channel.

This is used to create this channel on a server, if that is required to operate it.

Return type:

None

prepare_key(key: Optional[Union[bytes, _ModelT, Any]], key_serializer: Optional[Union[CodecT, str]], schema: Optional[SchemaT] = None, headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]] = None) Tuple[Any, Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]][source]

Prepare key before it is sent to this channel.

Topic uses this to implement serialization of keys sent to the channel.

Return type:

_GenericAlias[Any, _UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]]

prepare_value(value: Union[bytes, _ModelT, Any], value_serializer: Optional[Union[CodecT, str]], schema: Optional[SchemaT] = None, headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]] = None) Tuple[Any, Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]][source]

Prepare value before it is sent to this channel.

Topic uses this to implement serialization of values sent to the channel.

Return type:

_GenericAlias[Any, _UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]]

async decode(message: Message, *, propagate: bool = False) EventT[T][source]

Decode Message into Event.

Return type:

EventT

async deliver(message: Message) None[source]

Deliver message to queue from consumer.

This is called by the consumer to deliver the message to the channel.

Return type:

None

async put(value: EventT[T_contra]) None[source]

Put event onto this channel.

Return type:

None

async get(*, timeout: Optional[Union[timedelta, float, str]] = None) EventT[T][source]

Get the next Event received on this channel.

Return type:

EventT

empty() bool[source]

Return True if the queue is empty.

Return type:

bool

async on_key_decode_error(exc: Exception, message: Message) None[source]

Unable to decode the key of an item in the queue.

Return type:

None

async on_value_decode_error(exc: Exception, message: Message) None[source]

Unable to decode the value of an item in the queue.

Return type:

None

async on_decode_error(exc: Exception, message: Message) None[source]

Signal that there was an error reading an event in the queue.

When a message in the channel needs deserialization to be reconstructed back to its original form, we will sometimes see decoding/deserialization errors being raised, from missing fields or malformed payloads, and so on.

We will log the exception, but you can also override this to perform additional actions.

Admonition: Kafka

In the event a deserialization error occurs, we HAVE to commit the offset of the source message to continue processing the stream.

For this reason it is important that you keep a close eye on error logs. For easy of use, we suggest using log aggregation software, such as Sentry, to surface these errors to your operations team.

Return type:

None

on_stop_iteration() None[source]

Signal that iteration over this channel was stopped.

Tip

Remember to call super when overriding this method.

Return type:

None

derive(**kwargs: Any) ChannelT[T][source]

Derive new channel from this channel, using new configuration.

See faust.Topic.derive.

For local channels this will simply return the same channel.

Return type:

ChannelT

async throw(exc: BaseException) None[source]

Throw exception to be received by channel subscribers.

Tip

When you find yourself having to call this from a regular, non-async def function, you can use _throw() instead.

Return type:

None

property subscriber_count: int

Return number of active subscribers to local channel. :rtype: int

property label: str

Short textual description of channel. :rtype: str