faust.topics

Topic - Named channel using Kafka.

class faust.topics.Topic(app: AppT, *, topics: Optional[Sequence[str]] = None, pattern: Optional[Union[str, Pattern]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, is_iterator: bool = False, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, replicas: Optional[int] = None, acks: bool = True, internal: bool = False, config: Optional[Mapping[str, Any]] = None, queue: Optional[ThrowableQueue] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, maxsize: Optional[int] = None, root: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, allow_empty: Optional[bool] = None, has_prefix: bool = False, loop: Optional[AbstractEventLoop] = None)[source]

Define new topic description.

Parameters:
  • app (AppT) – App instance used to create this topic description.

  • topics (_UnionGenericAlias[_GenericAlias[str], None]) – List of topic names.

  • partitions (_UnionGenericAlias[int, None]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, and autoCreateTopics is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.

  • retention (_UnionGenericAlias[timedelta, float, str, None]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server.

  • pattern (_UnionGenericAlias[str, _SpecialGenericAlias, None]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.

  • schema (_UnionGenericAlias[SchemaT, None]) – Schema used for serialization/deserialization.

  • key_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – How to deserialize keys for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect” (Overrides schema if one is defined).

  • value_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – How to deserialize values for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect” (Overrides schema if ones is defined).

  • active_partitions (_UnionGenericAlias[_GenericAlias[TP], None]) – Set of faust.types.tuples.TP that this topic should be restricted to.

Raises:

TypeError – if both topics and pattern is provided.

async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send message to topic.

Return type:

_GenericAlias[RecordMetadata]

send_soon(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False, eager_partitioning: bool = False) FutureMessage[source]

Produce message by adding to buffer.

Notes

This method can be used by non-async def functions to produce messages.

Return type:

FutureMessage

async put(event: EventT) None[source]

Put event directly onto the underlying queue of this topic.

This will only affect subscribers to a particular instance, in a particular process.

Return type:

None

property pattern: Optional[Pattern]

Regular expression used by this topic (if any). :rtype: _UnionGenericAlias[_SpecialGenericAlias, None]

property partitions: Optional[int]

Return the number of configured partitions for this topic.

Notes

This is only active for internal topics, fully owned and managed by Faust itself.

We never touch the configuration of a topic that exists in Kafka, and Kafka will sometimes automatically create topics when they don’t exist. In this case the number of partitions for the automatically created topic will depend on the Kafka server configuration (num.partitions).

Always make sure your topics have the correct number of partitions. :rtype: _UnionGenericAlias[int, None]

derive(**kwargs: Any) ChannelT[source]

Create topic derived from the configuration of this topic.

Configuration will be copied from this topic, but any parameter overridden as a keyword argument.

See also

derive_topic(): for a list of supported keyword arguments.

Return type:

ChannelT

derive_topic(*, topics: Optional[Sequence[str]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, internal: Optional[bool] = None, config: Optional[Mapping[str, Any]] = None, prefix: str = '', suffix: str = '', **kwargs: Any) TopicT[source]

Create new topic with configuration derived from this topic.

Return type:

TopicT

get_topic_name() str[source]

Return the main topic name of this topic description.

As topic descriptions can have multiple topic names, this will only return when the topic has a singular topic name in the description.

Raises:
  • TypeError – if configured with a regular expression pattern.

  • ValueError – if configured with multiple topic names.

  • TypeError – if not configured with any names or patterns.

Return type:

str

async publish_message(fut: FutureMessage, wait: bool = False) Awaitable[RecordMetadata][source]

Fulfill promise to publish message to topic.

Return type:

_GenericAlias[RecordMetadata]

maybe_declare() None[source]

Declare/create this topic, only if it does not exist.

Return type:

None

async declare() None[source]

Declare/create this topic on the server.

Return type:

None

on_stop_iteration() None[source]

Signal that iteration over this channel was stopped. .. tip:: Remember to call super when overriding this method.

Return type:

None