faust

Python Stream processing.

class faust.Agent(fun: Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]], *, app: AppT, name: Optional[str] = None, channel: Optional[Union[str, ChannelT]] = None, concurrency: int = 1, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, help: Optional[str] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, isolated_partitions: bool = False, use_reply_headers: Optional[bool] = None, **kwargs: Any)[source]

Agent.

This is the type of object returned by the @app.agent decorator.

supervisor: SupervisorStrategyT = None
on_init_dependencies() Iterable[ServiceT][source]

Return list of services dependencies required to start agent.

Return type:

_GenericAlias[ServiceT]

actor_tracebacks() List[str][source]
Return type:

_GenericAlias[str]

async on_start() None[source]

Call when an agent starts.

Return type:

None

async on_stop() None[source]

Call when an agent stops.

Return type:

None

cancel() None[source]

Cancel agent and its actor instances running in this process.

Return type:

None

async on_partitions_revoked(revoked: Set[TP]) None[source]

Call when partitions are revoked.

Return type:

None

async on_partitions_assigned(assigned: Set[TP]) None[source]

Call when partitions are assigned.

Return type:

None

async on_isolated_partitions_revoked(revoked: Set[TP]) None[source]

Call when isolated partitions are revoked.

Return type:

None

async on_isolated_partitions_assigned(assigned: Set[TP]) None[source]

Call when isolated partitions are assigned.

Return type:

None

async on_shared_partitions_revoked(revoked: Set[TP]) None[source]

Call when non-isolated partitions are revoked.

Return type:

None

async on_shared_partitions_assigned(assigned: Set[TP]) None[source]

Call when non-isolated partitions are assigned.

Return type:

None

info() Mapping[source]

Return agent attributes as a dictionary.

Return type:

_SpecialGenericAlias

clone(*, cls: Optional[Type[AgentT]] = None, **kwargs: Any) AgentT[source]

Create clone of this agent object.

Keyword arguments can be passed to override any argument supported by Agent.__init__.

Return type:

AgentT

test_context(channel: Optional[ChannelT] = None, supervisor_strategy: Optional[SupervisorStrategyT] = None, on_error: Optional[Callable[[AgentT, BaseException], Awaitable]] = None, **kwargs: Any) AgentTestWrapperT[source]

Create new unit-testing wrapper for this agent.

Return type:

AgentTestWrapperT

actor_from_stream(stream: Optional[StreamT], *, index: Optional[int] = None, active_partitions: Optional[Set[TP]] = None, channel: Optional[ChannelT] = None) ActorT[Union[AsyncIterable, Awaitable]][source]

Create new actor from stream.

Return type:

ActorT

add_sink(sink: Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]) None[source]

Add new sink to further handle results from this agent.

Return type:

None

stream(channel: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, **kwargs: Any) StreamT[source]

Create underlying stream used by this agent.

Return type:

StreamT

async cast(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None) None[source]

RPC operation: like ask() but do not expect reply.

Cast here is like “casting a spell”, and will not expect a reply back from the agent.

Return type:

None

async ask(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None) Any[source]

RPC operation: ask agent for result of processing value.

This version will wait until the result is available and return the processed value.

Return type:

Any

async ask_nowait(value: Optional[Union[bytes, _ModelT, Any]] = None, *, key: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None, force: bool = False) ReplyPromise[source]

RPC operation: ask agent for result of processing value.

This version does not wait for the result to arrive, but instead returns a promise of future evaluation.

Return type:

ReplyPromise

async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None, correlation_id: Optional[str] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send message to topic used by agent.

Return type:

_GenericAlias[RecordMetadata]

async map(values: Union[AsyncIterable, Iterable], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) AsyncIterator[source]

RPC map operation on a list of values.

A map operation iterates over results as they arrive. See join() and kvjoin() if you want them in order.

Return type:

_SpecialGenericAlias

async kvmap(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) AsyncIterator[str][source]

RPC map operation on a list of (key, value) pairs.

A map operation iterates over results as they arrive. See join() and kvjoin() if you want them in order.

Return type:

_GenericAlias[str]

async join(values: Union[AsyncIterable[Union[bytes, _ModelT, Any]], Iterable[Union[bytes, _ModelT, Any]]], key: Optional[Union[bytes, _ModelT, Any]] = None, reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any][source]

RPC map operation on a list of values.

A join returns the results in order, and only returns once all values have been processed.

Return type:

_GenericAlias[Any]

async kvjoin(items: Union[AsyncIterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]], Iterable[Tuple[Optional[Union[bytes, _ModelT, Any]], Union[bytes, _ModelT, Any]]]], reply_to: Optional[Union[AgentT, ChannelT, str]] = None) List[Any][source]

RPC map operation on list of (key, value) pairs.

A join returns the results in order, and only returns once all values have been processed.

Return type:

_GenericAlias[Any]

get_topic_names() Iterable[str][source]

Return list of topic names this agent subscribes to.

Return type:

_GenericAlias[str]

property channel: ChannelT

Return channel used by agent. :rtype: ChannelT

property channel_iterator: AsyncIterator

Return channel agent iterates over. :rtype: _SpecialGenericAlias

property label: str

Return human-readable description of agent. :rtype: str

property shortlabel: str

Return short description of agent. :rtype: str

logger: logging.Logger = <Logger faust.agents.agent (WARNING)>
class faust.App(id: str, *, monitor: Optional[Monitor] = None, config_source: Optional[Any] = None, loop: Optional[AbstractEventLoop] = None, beacon: Optional[NodeT] = None, **options: Any)[source]

Faust Application.

Parameters:

id (str) – Application ID.

Keyword Arguments:

loop (asyncio.AbstractEventLoop) – optional event loop to use.

See also

Application Parameters – for supported keyword arguments.

SCAN_CATEGORIES: ClassVar[List[str]] = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task']
class BootStrategy(app: AppT, *, enable_web: Optional[bool] = None, enable_kafka: Optional[bool] = None, enable_kafka_producer: Optional[bool] = None, enable_kafka_consumer: Optional[bool] = None, enable_sensors: Optional[bool] = None)

App startup strategy.

The startup strategy defines the graph of services to start when the Faust worker for an app starts.

agents() Iterable[ServiceT]

Return list of services required to start agents.

Return type:

_GenericAlias[ServiceT]

client_only() Iterable[ServiceT]

Return services to start when app is in client_only mode.

Return type:

_GenericAlias[ServiceT]

enable_kafka: bool = True
enable_kafka_consumer: Optional[bool] = None
enable_kafka_producer: Optional[bool] = None
enable_sensors: bool = True
enable_web: Optional[bool] = None
kafka_client_consumer() Iterable[ServiceT]

Return list of services required to start Kafka client consumer.

Return type:

_GenericAlias[ServiceT]

kafka_conductor() Iterable[ServiceT]

Return list of services required to start Kafka conductor.

Return type:

_GenericAlias[ServiceT]

kafka_consumer() Iterable[ServiceT]

Return list of services required to start Kafka consumer.

Return type:

_GenericAlias[ServiceT]

kafka_producer() Iterable[ServiceT]

Return list of services required to start Kafka producer.

Return type:

_GenericAlias[ServiceT]

producer_only() Iterable[ServiceT]

Return services to start when app is in producer_only mode.

Return type:

_GenericAlias[ServiceT]

sensors() Iterable[ServiceT]

Return list of services required to start sensors.

Return type:

_GenericAlias[ServiceT]

server() Iterable[ServiceT]

Return services to start when app is in default mode.

Return type:

_GenericAlias[ServiceT]

tables() Iterable[ServiceT]

Return list of table-related services.

Return type:

_GenericAlias[ServiceT]

web_components() Iterable[ServiceT]

Return list of web-related services (excluding web server).

Return type:

_GenericAlias[ServiceT]

web_server() Iterable[ServiceT]

Return list of web-server services.

Return type:

_GenericAlias[ServiceT]

class Settings(*args: Any, **kwargs: Any)
property Agent

Agent class type.

The Agent class to use for agents, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyAgent(faust.Agent):
    ...

app = App(..., Agent=MyAgent)

Example using the string path to a class:

app = App(..., Agent='myproj.agents.Agent')
property ConsumerScheduler

Consumer scheduler class.

A strategy which dictates the priority of topics and partitions for incoming records. The default strategy does first round-robin over topics and then round-robin over partitions.

Example using a class:

class MySchedulingStrategy(DefaultSchedulingStrategy):
    ...

app = App(..., ConsumerScheduler=MySchedulingStrategy)

Example using the string path to a class:

app = App(..., ConsumerScheduler='myproj.MySchedulingStrategy')
DEFAULT_BROKER_URL: ClassVar[str] = 'kafka://localhost:9092'
property Event

Event class type.

The Event class to use for creating new event objects, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseEvent(faust.Event):
    ...

app = App(..., Event=MyBaseEvent)

Example using the string path to a class:

app = App(..., Event='myproj.events.Event')
property GlobalTable

GlobalTable class type.

The GlobalTable class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseGlobalTable(faust.GlobalTable):
    ...

app = App(..., GlobalTable=MyBaseGlobalTable)

Example using the string path to a class:

app = App(..., GlobalTable='myproj.tables.GlobalTable')
property HttpClient

Http client class type

The aiohttp.client.ClientSession class used as a HTTP client; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust
from aiohttp.client import ClientSession

class HttpClient(ClientSession):
    ...

app = faust.App(..., HttpClient=HttpClient)

Example using the string path to a class:

app = faust.App(..., HttpClient='myproj.http.HttpClient')
property LeaderAssignor

Leader assignor class type.

The LeaderAssignor class used for assigning a master Faust instance for the app; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.assignor import LeaderAssignor

class MyLeaderAssignor(LeaderAssignor):
    ...

app = App(..., LeaderAssignor=LeaderAssignor)

Example using the string path to a class:

app = App(..., Worker='myproj.assignor.LeaderAssignor')
property MY_SETTING

My custom setting.

To contribute new settings you only have to define a new setting decorated attribute here.

Look at the other settings for examples.

Remember that once you’ve added the setting you must also render the configuration reference:

$ make configref
property Monitor

Monitor sensor class type.

The Monitor class as the main sensor gathering statistics for the application; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust
from faust.sensors import Monitor

class MyMonitor(Monitor):
    ...

app = faust.App(..., Monitor=MyMonitor)

Example using the string path to a class:

app = faust.App(..., Monitor='myproj.monitors.Monitor')
NODE_HOSTNAME: ClassVar[str] = 'fv-az2026-396'
property PartitionAssignor

Partition assignor class type.

The PartitionAssignor class used for assigning topic partitions to worker instances; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.assignor import PartitionAssignor

class MyPartitionAssignor(PartitionAssignor):
    ...

app = App(..., PartitionAssignor=PartitionAssignor)

Example using the string path to a class:

app = App(..., Worker='myproj.assignor.PartitionAssignor')
property Router

Router class type.

The Router class used for routing requests to a worker instance having the partition for a specific key (e.g. table key); or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.router import Router

class MyRouter(Router):
    ...

app = App(..., Router=Router)

Example using the string path to a class:

app = App(..., Router='myproj.routers.Router')
SETTINGS: ClassVar[SettingIndexMapping] = {'Agent': <faust.types.settings.params._Symbol object>, 'ConsumerScheduler': <faust.types.settings.params._Symbol object>, 'Event': <faust.types.settings.params._Symbol object>, 'GlobalTable': <faust.types.settings.params._Symbol object>, 'HttpClient': <faust.types.settings.params._Symbol object>, 'LeaderAssignor': <faust.types.settings.params._Symbol object>, 'Monitor': <faust.types.settings.params._Symbol object>, 'PartitionAssignor': <faust.types.settings.params._Symbol object>, 'Router': <faust.types.settings.params._Symbol object>, 'Schema': <faust.types.settings.params._Symbol object>, 'Serializers': <faust.types.settings.params._Symbol object>, 'SetGlobalTable': <faust.types.settings.params._Symbol object>, 'SetTable': <faust.types.settings.params._Symbol object>, 'Stream': <faust.types.settings.params._Symbol object>, 'Table': <faust.types.settings.params._Symbol object>, 'TableManager': <faust.types.settings.params._Symbol object>, 'Topic': <faust.types.settings.params._Symbol object>, 'Worker': <faust.types.settings.params._Symbol object>, 'aerospike_retries_on_exception': <faust.types.settings.params.Int object>, 'aerospike_sleep_seconds_between_retries_on_exception': <faust.types.settings.params.Int object>, 'agent_supervisor': <faust.types.settings.params._Symbol object>, 'autodiscover': <faust.types.settings.params.Param object>, 'blocking_timeout': <faust.types.settings.params.Seconds object>, 'broker': <faust.types.settings.params.BrokerList object>, 'broker_api_version': <faust.types.settings.params.Str object>, 'broker_check_crcs': <faust.types.settings.params.Bool object>, 'broker_client_id': <faust.types.settings.params.Str object>, 'broker_commit_every': <faust.types.settings.params.UnsignedInt object>, 'broker_commit_interval': <faust.types.settings.params.Seconds object>, 'broker_commit_livelock_soft_timeout': <faust.types.settings.params.Seconds object>, 'broker_consumer': <faust.types.settings.params.BrokerList object>, 'broker_credentials': <faust.types.settings.params.Credentials object>, 'broker_heartbeat_interval': <faust.types.settings.params.Seconds object>, 'broker_max_poll_interval': <faust.types.settings.params.Seconds object>, 'broker_max_poll_records': <faust.types.settings.params.UnsignedInt object>, 'broker_producer': <faust.types.settings.params.BrokerList object>, 'broker_rebalance_timeout': <faust.types.settings.params.Seconds object>, 'broker_request_timeout': <faust.types.settings.params.Seconds object>, 'broker_session_timeout': <faust.types.settings.params.Seconds object>, 'cache': <faust.types.settings.params.URL object>, 'canonical_url': <faust.types.settings.params.URL object>, 'consumer_api_version': <faust.types.settings.params.Str object>, 'consumer_auto_offset_reset': <faust.types.settings.params.Str object>, 'consumer_connections_max_idle_ms': <faust.types.settings.params.Int object>, 'consumer_group_instance_id': <faust.types.settings.params.Str object>, 'consumer_max_fetch_size': <faust.types.settings.params.UnsignedInt object>, 'consumer_metadata_max_age_ms': <faust.types.settings.params.Int object>, 'crash_app_on_aerospike_exception': <faust.types.settings.params.Bool object>, 'datadir': <faust.types.settings.params.Path object>, 'debug': <faust.types.settings.params.Bool object>, 'env_prefix': <faust.types.settings.params.Str object>, 'id_format': <faust.types.settings.params.Str object>, 'key_serializer': <faust.types.settings.params.Codec object>, 'logging_config': <faust.types.settings.params.Dict object>, 'loghandlers': <faust.types.settings.params.LogHandlers object>, 'origin': <faust.types.settings.params.Str object>, 'processing_guarantee': <faust.types.settings.params.Enum.<locals>.EnumParam object>, 'producer_acks': <faust.types.settings.params.Int object>, 'producer_api_version': <faust.types.settings.params.Str object>, 'producer_compression_type': <faust.types.settings.params.Str object>, 'producer_connections_max_idle_ms': <faust.types.settings.params.Int object>, 'producer_linger': <faust.types.settings.params.Seconds object>, 'producer_linger_ms': <faust.types.settings.params.UnsignedInt object>, 'producer_max_batch_size': <faust.types.settings.params.UnsignedInt object>, 'producer_max_request_size': <faust.types.settings.params.UnsignedInt object>, 'producer_metadata_max_age_ms': <faust.types.settings.params.Int object>, 'producer_partitioner': <faust.types.settings.params._Symbol object>, 'producer_request_timeout': <faust.types.settings.params.Seconds object>, 'producer_threaded': <faust.types.settings.params.Bool object>, 'recovery_consistency_check': <faust.types.settings.params.Bool object>, 'reply_create_topic': <faust.types.settings.params.Bool object>, 'reply_expires': <faust.types.settings.params.Seconds object>, 'reply_to': <faust.types.settings.params.Str object>, 'reply_to_prefix': <faust.types.settings.params.Str object>, 'ssl_context': <faust.types.settings.params.SSLContext object>, 'store': <faust.types.settings.params.URL object>, 'store_check_exists': <faust.types.settings.params.Bool object>, 'stream_ack_cancelled_tasks': <faust.types.settings.params.Bool object>, 'stream_ack_exceptions': <faust.types.settings.params.Bool object>, 'stream_buffer_maxsize': <faust.types.settings.params.UnsignedInt object>, 'stream_processing_timeout': <faust.types.settings.params.Seconds object>, 'stream_publish_on_commit': <faust.types.settings.params.Bool object>, 'stream_recovery_delay': <faust.types.settings.params.Seconds object>, 'stream_wait_empty': <faust.types.settings.params.Bool object>, 'table_cleanup_interval': <faust.types.settings.params.Seconds object>, 'table_key_index_size': <faust.types.settings.params.UnsignedInt object>, 'table_standby_replicas': <faust.types.settings.params.UnsignedInt object>, 'tabledir': <faust.types.settings.params.Path object>, 'timezone': <faust.types.settings.params.Timezone object>, 'topic_allow_declare': <faust.types.settings.params.Bool object>, 'topic_disable_leader': <faust.types.settings.params.Bool object>, 'topic_partitions': <faust.types.settings.params.UnsignedInt object>, 'topic_replication_factor': <faust.types.settings.params.UnsignedInt object>, 'url': <faust.types.settings.params.URL object>, 'value_serializer': <faust.types.settings.params.Codec object>, 'version': <faust.types.settings.params.Int object>, 'web': <faust.types.settings.params.URL object>, 'web_bind': <faust.types.settings.params.Str object>, 'web_cors_options': <faust.types.settings.params.Dict object>, 'web_enabled': <faust.types.settings.params.Bool object>, 'web_host': <faust.types.settings.params.Str object>, 'web_in_thread': <faust.types.settings.params.Bool object>, 'web_port': <faust.types.settings.params.Port object>, 'web_ssl_context': <faust.types.settings.params.SSLContext object>, 'web_transport': <faust.types.settings.params.URL object>, 'worker_redirect_stdouts': <faust.types.settings.params.Bool object>, 'worker_redirect_stdouts_level': <faust.types.settings.params.Severity object>}

Index of all settings by name.

SETTINGS_BY_SECTION: ClassVar[SettingSectionIndexMapping] = defaultdict(<class 'list'>, {<Section: SectionType.COMMON>: [<faust.types.settings.params.Param object>, <faust.types.settings.params.Path object>, <faust.types.settings.params.Path object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Timezone object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.BrokerList object>, <faust.types.settings.params.Credentials object>, <faust.types.settings.params.SSLContext object>, <faust.types.settings.params.Dict object>, <faust.types.settings.params.LogHandlers object>, <faust.types.settings.params.Enum.<locals>.EnumParam object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>], <Section: SectionType.AGENT>: [<faust.types.settings.params._Symbol object>], <Section: SectionType.BROKER>: [<faust.types.settings.params.BrokerList object>, <faust.types.settings.params.BrokerList object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>], <Section: SectionType.CONSUMER>: [<faust.types.settings.params.Str object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params._Symbol object>], <Section: SectionType.SERIALIZATION>: [<faust.types.settings.params.Codec object>, <faust.types.settings.params.Codec object>], <Section: SectionType.PRODUCER>: [<faust.types.settings.params.Int object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.STREAM>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>], <Section: SectionType.RPC>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>], <Section: SectionType.TABLE>: [<faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.TOPIC>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.WEB_SERVER>: [<faust.types.settings.params.URL object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Dict object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Port object>, <faust.types.settings.params.SSLContext object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>], <Section: SectionType.WORKER>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Severity object>], <Section: SectionType.EXTENSION>: [<faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>]})

Index of all sections and the settings in a section.

property Schema

Schema class type.

The Schema class to use as the default schema type when no schema specified. or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseSchema(faust.Schema):
    ...

app = App(..., Schema=MyBaseSchema)

Example using the string path to a class:

app = App(..., Schema='myproj.schemas.Schema')
property Serializers

Serializer registry class type.

The Registry class used for serializing/deserializing messages; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.serialiers import Registry

class MyRegistry(Registry):
    ...

app = App(..., Serializers=MyRegistry)

Example using the string path to a class:

app = App(..., Serializers='myproj.serializers.Registry')
property SetGlobalTable

SetGlobalTable class type.

The SetGlobalTable class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseSetGlobalTable(faust.SetGlobalTable):
    ...

app = App(..., SetGlobalTable=MyBaseGlobalSetTable)

Example using the string path to a class:

app = App(..., SetGlobalTable='myproj.tables.SetGlobalTable')
property SetTable

SetTable extension table.

The SetTable class to use for table-of-set tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MySetTable(faust.SetTable):
    ...

app = App(..., Table=MySetTable)

Example using the string path to a class:

app = App(..., Table='myproj.tables.MySetTable')
property Stream

Stream class type.

The Stream class to use for streams, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseStream(faust.Stream):
    ...

app = App(..., Stream=MyBaseStream)

Example using the string path to a class:

app = App(..., Stream='myproj.streams.Stream')
property Table

Table class type.

The Table class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseTable(faust.Table):
    ...

app = App(..., Table=MyBaseTable)

Example using the string path to a class:

app = App(..., Table='myproj.tables.Table')
property TableManager

Table manager class type.

The TableManager used for managing tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.tables import TableManager

class MyTableManager(TableManager):
    ...

app = App(..., TableManager=MyTableManager)

Example using the string path to a class:

app = App(..., TableManager='myproj.tables.TableManager')
property Topic

Topic class type.

The Topic class used for defining new topics; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust

class MyTopic(faust.Topic):
    ...

app = faust.App(..., Topic=MyTopic)

Example using the string path to a class:

app = faust.App(..., Topic='myproj.topics.Topic')
property Worker

Worker class type.

The Worker class used for starting a worker for this app; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust

class MyWorker(faust.Worker):
    ...

app = faust.App(..., Worker=Worker)

Example using the string path to a class:

app = faust.App(..., Worker='myproj.workers.Worker')
property aerospike_retries_on_exception

Number of retries to aerospike on a runtime error from the aerospike client.

Set this to the number of retries using the aerospike client on a runtime Exception thrown by the client

property aerospike_sleep_seconds_between_retries_on_exception

Seconds to sleep between retries to aerospike on a runtime error from the aerospike client.

Set this to the sleep in seconds between retries using the aerospike client on a runtime Exception thrown by the client

property agent_supervisor

Default agent supervisor type.

An agent may start multiple instances (actors) when the concurrency setting is higher than one (e.g. @app.agent(concurrency=2)).

Multiple instances of the same agent are considered to be in the same supervisor group.

The default supervisor is the mode.OneForOneSupervisor: if an instance in the group crashes, we restart that instance only.

These are the supervisors supported:

property appdir: Path
Return type:

Path

property autodiscover

Automatic discovery of agents, tasks, timers, views and commands.

Faust has an API to add different asyncio services and other user extensions, such as “Agents”, HTTP web views, command-line commands, and timers to your Faust workers. These can be defined in any module, so to discover them at startup, the worker needs to traverse packages looking for them.

Warning

The autodiscovery functionality uses the https://pypi.org/project/Venusian/ library to scan wanted packages for @app.agent, @app.page, @app.command, @app.task and @app.timer decorators, but to do so, it’s required to traverse the package path and import every module in it.

Importing random modules like this can be dangerous so make sure you follow Python programming best practices. Do not start threads; perform network I/O; do test monkey-patching for mocks or similar, as a side effect of importing a module. If you encounter a case such as this then please find a way to perform your action in a lazy manner.

Warning

If the above warning is something you cannot fix, or if it’s out of your control, then please set autodiscover=False and make sure the worker imports all modules where your decorators are defined.

The value for this argument can be:

bool

If App(autodiscover=True) is set, the autodiscovery will scan the package name described in the origin attribute.

The origin attribute is automatically set when you start a worker using the faust command line program, for example:

faust -A example.simple worker

The -A, option specifies the app, but you can also create a shortcut entry point by calling app.main():

if __name__ == '__main__':
    app.main()

Then you can start the faust program by executing for example python myscript.py worker --loglevel=INFO, and it will use the correct application.

Sequence[str]

The argument can also be a list of packages to scan:

app = App(..., autodiscover=['proj_orders', 'proj_accounts'])
Callable[[], Sequence[str]]

The argument can also be a function returning a list of packages to scan:

def get_all_packages_to_scan():
    return ['proj_orders', 'proj_accounts']

app = App(..., autodiscover=get_all_packages_to_scan)
False

If everything you need is in a self-contained module, or you import the stuff you need manually, just set autodiscover to False and don’t worry about it :-)

Django

When using https://pypi.org/project/Django/ and the DJANGO_SETTINGS_MODULE environment variable is set, the Faust app will scan all packages found in the INSTALLED_APPS setting.

If you’re using Django you can use this to scan for agents/pages/commands in all packages defined in INSTALLED_APPS.

Faust will automatically detect that you’re using Django and do the right thing if you do:

app = App(..., autodiscover=True)

It will find agents and other decorators in all of the reusable Django applications. If you want to manually control what packages are traversed, then provide a list:

app = App(..., autodiscover=['package1', 'package2'])

or if you want exactly None packages to be traversed, then provide a False:

app = App(.., autodiscover=False)

which is the default, so you can simply omit the argument.

Tip

For manual control over autodiscovery, you can also call the app.discover() method manually.

property blocking_timeout

Blocking timeout (in seconds).

When specified the worker will start a periodic signal based timer that only triggers when the loop has been blocked for a time exceeding this timeout.

This is the most safe way to detect blocking, but could have adverse effects on libraries that do not automatically retry interrupted system calls.

Python itself does retry all interrupted system calls since version 3.5 (see PEP 475), but this might not be the case with C extensions added to the worker by the user.

The blocking detector is a background thread that periodically wakes up to either arm a timer, or cancel an already armed timer. In pseudocode:

while True:
    # cancel previous alarm and arm new alarm
    signal.signal(signal.SIGALRM, on_alarm)
    signal.setitimer(signal.ITIMER_REAL, blocking_timeout)
    # sleep to wakeup just before the timeout
    await asyncio.sleep(blocking_timeout * 0.96)

def on_alarm(signum, frame):
    logger.warning('Blocking detected: ...')

If the sleep does not wake up in time the alarm signal will be sent to the process and a traceback will be logged.

property broker

Broker URL, or a list of alternative broker URLs.

Faust needs the URL of a “transport” to send and receive messages.

Currently, the only supported production transport is kafka://. This uses the https://pypi.org/project/aiokafka/ client under the hood, for consuming and producing messages.

You can specify multiple hosts at the same time by separating them using the semi-comma:

kafka://kafka1.example.com:9092;kafka2.example.com:9092

Which in actual code looks like this:

BROKERS = 'kafka://kafka1.example.com:9092;kafka2.example.com:9092'
app = faust.App(
    'id',
    broker=BROKERS,
)

You can also pass a list of URLs:

app = faust.App(
    'id',
    broker=['kafka://kafka1.example.com:9092',
            'kafka://kafka2.example.com:9092'],
)

See also

You can configure the transport used for consuming and producing separately, by setting the broker_consumer and broker_producer settings.

This setting is used as the default.

Available Transports

property broker_api_version

Broker API version,.

This setting is also the default for consumer_api_version, and producer_api_version.

Negotiate producer protocol version.

The default value - “auto” means use the latest version supported by both client and server.

Any other version set means you are requesting a specific version of the protocol.

Example Kafka uses:

Disable sending headers for all messages produced

Kafka headers support was added in Kafka 0.11, so you can specify broker_api_version="0.10" to remove the headers from messages.

property broker_check_crcs

Broker CRC check.

Automatically check the CRC32 of the records consumed.

property broker_client_id

Broker client ID.

There is rarely any reason to configure this setting.

The client id is used to identify the software used, and is not usually configured by the user.

property broker_commit_every

Broker commit message frequency.

Commit offset every n messages.

See also broker_commit_interval, which is how frequently we commit on a timer when there are few messages being received.

property broker_commit_interval

Broker commit time frequency.

How often we commit messages that have been fully processed (acked).

property broker_commit_livelock_soft_timeout

Commit livelock timeout.

How long time it takes before we warn that the Kafka commit offset has not advanced (only when processing messages).

property broker_consumer

Consumer broker URL.

You can use this setting to configure the transport used for producing and consuming separately.

If not set the value found in broker will be used.

property broker_credentials

Broker authentication mechanism.

Specify the authentication mechanism to use when connecting to the broker.

The default is to not use any authentication.

SASL Authentication

You can enable SASL authentication via plain text:

app = faust.App(
    broker_credentials=faust.SASLCredentials(
        username='x',
        password='y',
    ))

Warning

Do not use literal strings when specifying passwords in production, as they can remain visible in stack traces.

Instead the best practice is to get the password from a configuration file, or from the environment:

BROKER_USERNAME = os.environ.get('BROKER_USERNAME')
BROKER_PASSWORD = os.environ.get('BROKER_PASSWORD')

app = faust.App(
    broker_credentials=faust.SASLCredentials(
        username=BROKER_USERNAME,
        password=BROKER_PASSWORD,
    ))
GSSAPI Authentication

GSSAPI authentication over plain text:

app = faust.App(
    broker_credentials=faust.GSSAPICredentials(
        kerberos_service_name='faust',
        kerberos_domain_name='example.com',
    ),
)

GSSAPI authentication over SSL:

import ssl
ssl_context = ssl.create_default_context(
    purpose=ssl.Purpose.SERVER_AUTH, cafile='ca.pem')
ssl_context.load_cert_chain(
    'client.cert', keyfile='client.key')

app = faust.App(
    broker_credentials=faust.GSSAPICredentials(
        kerberos_service_name='faust',
        kerberos_domain_name='example.com',
        ssl_context=ssl_context,
    ),
)
SSL Authentication

Provide an SSL context for the Kafka broker connections.

This allows Faust to use a secure SSL/TLS connection for the Kafka connections and enabling certificate-based authentication.

import ssl

ssl_context = ssl.create_default_context(
    purpose=ssl.Purpose.SERVER_AUTH, cafile='ca.pem')
ssl_context.load_cert_chain(
    'client.cert', keyfile='client.key')
app = faust.App(..., broker_credentials=ssl_context)
property broker_heartbeat_interval

Broker heartbeat interval.

How often we send heartbeats to the broker, and also how often we expect to receive heartbeats from the broker.

If any of these time out, you should increase this setting.

property broker_max_poll_interval

Broker max poll interval.

The maximum allowed time (in seconds) between calls to consume messages If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout.

See KIP-62 for technical details.

property broker_max_poll_records

Broker max poll records.

The maximum number of records returned in a single call to poll(). If you find that your application needs more time to process messages you may want to adjust broker_max_poll_records to tune the number of records that must be handled on every loop iteration.

property broker_producer

Producer broker URL.

You can use this setting to configure the transport used for producing and consuming separately.

If not set the value found in broker will be used.

property broker_rebalance_timeout

Broker rebalance timeout.

How long to wait for a node to finish rebalancing before the broker will consider it dysfunctional and remove it from the cluster.

Increase this if you experience the cluster being in a state of constantly rebalancing, but make sure you also increase the broker_heartbeat_interval at the same time.

Note

The session timeout must not be greater than the broker_request_timeout.

property broker_request_timeout

Kafka client request timeout.

Note

The request timeout must not be less than the broker_session_timeout.

property broker_session_timeout

Broker session timeout.

How long to wait for a node to finish rebalancing before the broker will consider it dysfunctional and remove it from the cluster.

Increase this if you experience the cluster being in a state of constantly rebalancing, but make sure you also increase the broker_heartbeat_interval at the same time.

Note

The session timeout must not be greater than the broker_request_timeout.

property cache

Cache backend URL.

Optional backend used for Memcached-style caching. URL can be:

  • redis://host

  • rediscluster://host, or

  • memory://.

property canonical_url

Node specific canonical URL.

You shouldn’t have to set this manually.

The canonical URL defines how to reach the web server on a running worker node, and is usually set by combining the web_host and web_port settings.

property consumer_api_version

Consumer API version.

Configures the broker API version to use for consumers. See broker_api_version for more information.

property consumer_auto_offset_reset

Consumer auto offset reset.

Where the consumer should start reading messages from when there is no initial offset, or the stored offset no longer exists, e.g. when starting a new consumer for the first time.

Options include ‘earliest’, ‘latest’, ‘none’.

property consumer_connections_max_idle_ms

Consumer connections max idle milliseconds.

Close idle connections after the number of milliseconds specified by this config.

Default: 540000 (9 minutes).

property consumer_group_instance_id

Consumer group instance id.

The group_instance_id for static partition assignment.

If not set, default assignment strategy is used. Otherwise, each consumer instance has to have a unique id.

property consumer_max_fetch_size

Consumer max fetch size.

The maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size.

Note: This is PER PARTITION, so a limit of 1Mb when your workers consume from 10 topics having 100 partitions each, means a fetch request can be up to a gigabyte (10 * 100 * 1Mb), This limit being too generous may cause rebalancing issues: if the amount of time required to flush pending data stuck in socket buffers exceed the rebalancing timeout.

You must keep this limit low enough to account for many partitions being assigned to a single node.

property consumer_metadata_max_age_ms

Consumer metadata max age milliseconds

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

Default: 300000

property crash_app_on_aerospike_exception

Crashes the app on an aerospike Exceptions.

If True, crashes the app and prevents the commit offset on progressing. If False client has to catch the Error and implement a dead letter queue

data_directory_for_version(version: int) Path

Return the directory path for data belonging to specific version.

Return type:

Path

property datadir

Application data directory.

The directory in which this instance stores the data used by local tables, etc.

See also

  • The data directory can also be set using the faust --datadir option, from the command-line, so there is usually no reason to provide a default value when creating the app.

property debug

Use in development to expose sensor information endpoint.

Tip

If you want to enable the sensor statistics endpoint in production, without enabling the debug setting, you can do so by adding the following code:

app.web.blueprints.add(
    '/stats/', 'faust.web.apps.stats:blueprint')
property env_prefix

Environment variable prefix.

When configuring Faust by environent variables, this adds a common prefix to all Faust environment value names.

find_old_versiondirs() Iterable[Path]
Return type:

_GenericAlias[Path]

getenv(env_name: str) Any
Return type:

Any

property id: str
Return type:

str

property id_format

Application ID format template.

The format string used to generate the final id value by combining it with the version parameter.

property key_serializer

Default key serializer.

Serializer used for keys by default when no serializer is specified, or a model is not being used.

This can be the name of a serializer/codec, or an actual faust.serializers.codecs.Codec instance.

See also

  • The Codecs section in the model guide – for more information about codecs.

property logging_config

Logging dictionary configuration.

Optional dictionary for logging configuration, as supported by logging.config.dictConfig().

property loghandlers

List of custom logging handlers.

Specify a list of custom log handlers to use in worker instances.

property name: str
Return type:

str

on_init(id: str, **kwargs: Any) None
Return type:

None

property origin

The reverse path used to find the app.

For example if the app is located in:

from myproj.app import app

Then the origin should be "myproj.app".

The faust worker program will try to automatically set the origin, but if you are having problems with auto generated names then you can set origin manually.

property processing_guarantee

The processing guarantee that should be used.

Possible values are “at_least_once” (default) and “exactly_once”.

Note that if exactly-once processing is enabled consumers are configured with isolation.level="read_committed" and producers are configured with retries=Integer.MAX_VALUE and enable.idempotence=true per default.

Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production. For development you can change this, by adjusting broker setting transaction.state.log.replication.factor to the number of brokers you want to use.

property producer_acks

Producer Acks.

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:

  • 0: Producer will not wait for any acknowledgment from

    the server at all. The message will immediately be considered sent (Not recommended).

  • 1: The broker leader will write the record to its local

    log but will respond without awaiting full acknowledgment from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

  • -1: The broker leader will wait for the full set of in-sync

    replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

property producer_api_version

Producer API version.

Configures the broker API version to use for producers. See broker_api_version for more information.

property producer_compression_type

Producer compression type.

The compression type for all data generated by the producer. Valid values are gzip, snappy, lz4, or None.

property producer_connections_max_idle_ms

Producer connections max idle milliseconds.

Close idle connections after the number of milliseconds specified by this config.

Default: 540000 (9 minutes).

property producer_linger

Producer batch linger configuration.

Minimum time to batch before sending out messages from the producer.

Should rarely have to change this.

property producer_linger_ms

Deprecated setting, please use producer_linger instead.

This used to be provided as milliseconds, the new setting uses seconds.

property producer_max_batch_size

Producer max batch size.

Max size of each producer batch, in bytes.

property producer_max_request_size

Producer maximum request size.

Maximum size of a request in bytes in the producer.

Should rarely have to change this.

property producer_metadata_max_age_ms

Producer metadata max age milliseconds

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

Default: 300000

property producer_partitioner

Producer partitioning strategy.

The Kafka producer can be configured with a custom partitioner to change how keys are partitioned when producing to topics.

The default partitioner for Kafka is implemented as follows, and can be used as a template for your own partitioner:

import random
from typing import List
from kafka.partitioner.hashed import murmur2

def partition(key: bytes,
            all_partitions: List[int],
            available: List[int]) -> int:
    '''Default partitioner.

    Hashes key to partition using murmur2 hashing
    (from java client) If key is None, selects partition
    randomly from available, or from all partitions if none
    are currently available

    Arguments:
        key: partitioning key
        all_partitions: list of all partitions sorted by
                        partition ID.
        available: list of available partitions
                   in no particular order
    Returns:
        int: one of the values from ``all_partitions``
             or ``available``.
    '''
    if key is None:
        source = available if available else all_paritions
        return random.choice(source)
    index: int = murmur2(key)
    index &= 0x7fffffff
    index %= len(all_partitions)
    return all_partitions[index]
property producer_request_timeout

Producer request timeout.

Timeout for producer operations. This is set high by default, as this is also the time when producer batches expire and will no longer be retried.

property producer_threaded

Thread separate producer for send_soon.

If True, spin up a different producer in a different thread to be used for messages buffered up for producing via send_soon function.

property recovery_consistency_check

Check Kafka and local offsets for consistency.

If True, assert that Kafka highwater offsets >= local offset in the rocksdb state storee

relative_to_appdir(path: Path) Path

Prepare app directory path.

If path is absolute the path is returned as-is, but if path is relative it will be assumed to belong under the app directory.

Return type:

Path

property reply_create_topic

Automatically create reply topics.

Set this to True if you plan on using the RPC with agents.

This will create the internal topic used for RPC replies on that instance at startup.

property reply_expires

RPC reply expiry time in seconds.

The expiry time (in seconds float, or timedelta), for how long replies will stay in the instances local reply topic before being removed.

property reply_to

Reply to address.

The name of the reply topic used by this instance. If not set one will be automatically generated when the app is created.

property reply_to_prefix

Reply address topic name prefix.

The prefix used when generating reply topic names.

property ssl_context

SSL configuration.

See credentials.

property store

Table storage backend URL.

The backend used for table storage.

Tables are stored in-memory by default, but you should not use the memory:// store in production.

In production, a persistent table store, such as rocksdb:// is preferred.

property store_check_exists

Execute exists on the underlying store.

If True, executes exists on the underlying store. If False client has to catch KeyError

property stream_ack_cancelled_tasks

Deprecated setting has no effect.

property stream_ack_exceptions

Deprecated setting has no effect.

property stream_buffer_maxsize

Stream buffer maximum size.

This setting control back pressure to streams and agents reading from streams.

If set to 4096 (default) this means that an agent can only keep at most 4096 unprocessed items in the stream buffer.

Essentially this will limit the number of messages a stream can “prefetch”.

Higher numbers gives better throughput, but do note that if your agent sends messages or update tables (which sends changelog messages).

This means that if the buffer size is large, the broker_commit_interval or broker_commit_every settings must be set to commit frequently, avoiding back pressure from building up.

A buffer size of 131_072 may let you process over 30,000 events a second as a baseline, but be careful with a buffer size that large when you also send messages or update tables.

property stream_processing_timeout

Stream processing timeout.

Timeout (in seconds) for processing events in the stream. If processing of a single event exceeds this time we log an error, but do not stop processing.

If you are seeing a warning like this you should either

  1. increase this timeout to allow agents to spend more time

    on a single event, or

  2. add a timeout to the operation in the agent, so stream processing

    always completes before the timeout.

The latter is preferred for network operations such as web requests. If a network service you depend on is temporarily offline you should consider doing retries (send to separate topic):

main_topic = app.topic('main')
deadletter_topic = app.topic('main_deadletter')

async def send_request(value, timeout: Optional[float] = None) -> None:
    await app.http_client.get('http://foo.com', timeout=timeout)

@app.agent(main_topic)
async def main(stream):
    async for value in stream:
    try:
        await send_request(value, timeout=5)
    except asyncio.TimeoutError:
        await deadletter_topic.send(value)

@app.agent(deadletter_topic)
    async def main_deadletter(stream):
        async for value in stream:
        # wait for 30 seconds before retrying.
        await stream.sleep(30)
        await send_request(value)
property stream_publish_on_commit

Stream delay producing until commit time.

If enabled we buffer up sending messages until the source topic offset related to that processing is committed. This means when we do commit, we may have buffered up a LOT of messages so commit needs to happen frequently (make sure to decrease broker_commit_every).

property stream_recovery_delay

Stream recovery delayl

Number of seconds to sleep before continuing after rebalance. We wait for a bit to allow for more nodes to join/leave before starting recovery tables and then processing streams. This to minimize the chance of errors rebalancing loops.

property stream_wait_empty

Stream wait empty.

This setting controls whether the worker should wait for the currently processing task in an agent to complete before rebalancing or shutting down.

On rebalance/shut down we clear the stream buffers. Those events will be reprocessed after the rebalance anyway, but we may have already started processing one event in every agent, and if we rebalance we will process that event again.

By default we will wait for the currently active tasks, but if your streams are idempotent you can disable it using this setting.

property table_cleanup_interval

Table cleanup interval.

How often we cleanup tables to remove expired entries.

property table_key_index_size

Table key index size.

Tables keep a cache of key to partition number to speed up table lookups.

This setting configures the maximum size of that cache.

property table_standby_replicas

Table standby replicas.

The number of standby replicas for each table.

property tabledir

Application table data directory.

The directory in which this instance stores local table data. Usually you will want to configure the datadir setting, but if you want to store tables separately you can configure this one.

If the path provided is relative (it has no leading slash), then the path will be considered to be relative to the datadir setting.

property timezone

Project timezone.

The timezone used for date-related functionality such as cronjobs.

property topic_allow_declare

Allow creating new topics.

This setting disables the creation of internal topics.

Faust will only create topics that it considers to be fully owned and managed, such as intermediate repartition topics, table changelog topics etc.

Some Kafka managers does not allow services to create topics, in that case you should set this to False.

property topic_disable_leader

Disable leader election topic.

This setting disables the creation of the leader election topic.

If you’re not using the on_leader=True argument to task/timer/etc., decorators then use this setting to disable creation of the topic.

property topic_partitions

Topic partitions.

Default number of partitions for new topics.

Note

This defines the maximum number of workers we could distribute the workload of the application (also sometimes referred as the sharding factor of the application).

property topic_replication_factor

Topic replication factor.

The default replication factor for topics created by the application.

Note

Generally this should be the same as the configured replication factor for your Kafka cluster.

property url

Backward compatibility alias to broker.

property value_serializer

Default value serializer.

Serializer used for values by default when no serializer is specified, or a model is not being used.

This can be string, the name of a serializer/codec, or an actual faust.serializers.codecs.Codec instance.

See also

  • The Codecs section in the model guide – for more information about codecs.

property version

App version.

Version of the app, that when changed will create a new isolated instance of the application. The first version is 1, the second version is 2, and so on.

Source topics will not be affected by a version change.

Faust applications will use two kinds of topics: source topics, and internally managed topics. The source topics are declared by the producer, and we do not have the opportunity to modify any configuration settings, like number of partitions for a source topic; we may only consume from them. To mark a topic as internal, use: app.topic(..., internal=True).

property web

Web server driver to use.

property web_bind

Web network interface binding mask.

The IP network address mask that decides what interfaces the web server will bind to.

By default this will bind to all interfaces.

This option is usually set by faust worker --web-bind, not by passing it as a keyword argument to app.

property web_cors_options

Cross Origin Resource Sharing options.

Enable Cross-Origin Resource Sharing options for all web views in the internal web server.

This should be specified as a dictionary of URLs to ResourceOptions:

app = App(..., web_cors_options={
    'http://foo.example.com': ResourceOptions(
        allow_credentials=True,
        allow_methods='*'k,
    )
})

Individual views may override the CORS options used as arguments to to @app.page and blueprint.route.

property web_enabled

Enable/disable internal web server.

Enable web server and other web components.

This option can also be set using faust worker --without-web.

property web_host

Web server host name.

Hostname used to access this web server, used for generating the canonical_url setting.

This option is usually set by faust worker --web-host, not by passing it as a keyword argument to app.

property web_in_thread

Run the web server in a separate thread.

Use this if you have a large value for stream_buffer_maxsize and want the web server to be responsive when the worker is otherwise busy processing streams.

Note

Running the web server in a separate thread means web views and agents will not share the same event loop.

property web_port

Web server port.

A port number between 1024 and 65535 to use for the web server.

This option is usually set by faust worker --web-port, not by passing it as a keyword argument to app.

property web_ssl_context

Web server SSL configuration.

See credentials.

property web_transport

Network transport used for the web server.

Default is to use TCP, but this setting also enables you to use Unix domainN sockets. To use domain sockets specify an URL including the path to the file you want to create like this:

unix:///tmp/server.sock

This will create a new domain socket available in /tmp/server.sock.

property worker_redirect_stdouts

Redirecting standard outputs.

Enable to have the worker redirect output to sys.stdout and sys.stderr to the Python logging system.

Enabled by default.

property worker_redirect_stdouts_level

Level used when redirecting standard outputs.

The logging level to use when redirect STDOUT/STDERR to logging.

env: Mapping[str, str]

Environment. Defaults to os.environ.

client_only: bool = False

Set this to True if app should only start the services required to operate as an RPC client (producer and simple reply consumer).

producer_only: bool = False

Set this to True if app should run without consumer/tables.

tracer: Optional[TracerT] = None

Optional tracing support.

on_init_dependencies() Iterable[ServiceT][source]

Return list of additional service dependencies.

The services returned will be started with the app when the app starts.

Return type:

_GenericAlias[ServiceT]

async on_first_start() None[source]

Call first time app starts in this process.

Return type:

None

async on_start() None[source]

Call every time app start/restarts.

Return type:

None

async on_started() None[source]

Call when app is fully started.

Return type:

None

async on_started_init_extra_tasks() None[source]

Call when started to start additional tasks.

Return type:

None

async on_started_init_extra_services() None[source]

Call when initializing extra services at startup.

Return type:

None

async on_init_extra_service(service: Union[ServiceT, Type[ServiceT]]) ServiceT[source]

Call when adding user services to this app.

Return type:

ServiceT

config_from_object(obj: Any, *, silent: bool = False, force: bool = False) None[source]

Read configuration from object.

Object is either an actual object or the name of a module to import.

Examples

>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig
>>> app.config_from_object(faustconfig)
Parameters:
  • silent (bool) – If true then import errors will be ignored.

  • force (bool) – Force reading configuration immediately. By default the configuration will be read only when required.

Return type:

None

finalize() None[source]

Finalize app configuration.

Return type:

None

worker_init() None[source]

Init worker/CLI commands.

Return type:

None

worker_init_post_autodiscover() None[source]

Init worker after autodiscover.

Return type:

None

discover(*extra_modules: str, categories: ~typing.Optional[~typing.Iterable[str]] = None, ignore: ~typing.Iterable[~typing.Any] = [<built-in method search of re.Pattern object>, '.__main__']) None[source]

Discover decorators in packages.

Return type:

None

main() NoReturn[source]

Execute the faust umbrella command using this app.

Return type:

_SpecialForm

topic(*topics: str, pattern: Optional[Union[str, Pattern]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, replicas: Optional[int] = None, acks: bool = True, internal: bool = False, config: Optional[Mapping[str, Any]] = None, maxsize: Optional[int] = None, allow_empty: bool = False, has_prefix: bool = False, loop: Optional[AbstractEventLoop] = None) TopicT[source]

Create topic description.

Topics are named channels (for example a Kafka topic), that exist on a server. To make an ephemeral local communication channel use: channel().

Return type:

TopicT

channel(*, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, maxsize: Optional[int] = None, loop: Optional[AbstractEventLoop] = None) ChannelT[source]

Create new channel.

By default this will create an in-memory channel used for intra-process communication, but in practice channels can be backed by any transport (network or even means of inter-process communication).

Return type:

ChannelT

agent(channel: Optional[Union[str, ChannelT[_T]]] = None, *, name: Optional[str] = None, concurrency: int = 1, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = True, **kwargs: Any) Callable[[Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]]], AgentT[_T]][source]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type:

_CallableGenericAlias[_CallableGenericAlias[StreamT, _UnionGenericAlias[_GenericAlias[Any, Any, None], _GenericAlias[None], _SpecialGenericAlias]], AgentT]

actor(channel: Optional[Union[str, ChannelT[_T]]] = None, *, name: Optional[str] = None, concurrency: int = 1, supervisor_strategy: Optional[Type[SupervisorStrategyT]] = None, sink: Optional[Iterable[Union[AgentT, ChannelT, Callable[[Any], Optional[Awaitable]]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = True, **kwargs: Any) Callable[[Callable[[StreamT[_T]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable]]], AgentT[_T]]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type:

_CallableGenericAlias[_CallableGenericAlias[StreamT, _UnionGenericAlias[_GenericAlias[Any, Any, None], _GenericAlias[None], _SpecialGenericAlias]], AgentT]

task(fun: Union[Callable[[AppT], Awaitable], Callable[[], Awaitable]] = None, *, on_leader: bool = False, traced: bool = True) Union[Callable[[Union[Callable[[AppT], Awaitable], Callable[[], Awaitable]]], Union[Callable[[AppT], Awaitable], Callable[[], Awaitable]]], Callable[[AppT], Awaitable], Callable[[], Awaitable]][source]

Define an async def function to be started with the app.

This is like timer() but a one-shot task only executed at worker startup (after recovery and the worker is fully ready for operation).

The function may take zero, or one argument. If the target function takes an argument, the app argument is passed:

>>> @app.task
>>> async def on_startup(app):
...    print('STARTING UP: %r' % (app,))

Nullary functions are also supported:

>>> @app.task
>>> async def on_startup():
...     print('STARTING UP')
timer(interval: Union[timedelta, float, str], on_leader: bool = False, traced: bool = True, name: Optional[str] = None, max_drift_correction: float = 0.1) Callable[source]

Define an async def function to be run at periodic intervals.

Like task(), but executes periodically until the worker is shut down.

This decorator takes an async function and adds it to a list of timers started with the app.

Parameters:
  • interval (Seconds) – How often the timer executes in seconds.

  • on_leader (bool) – Should the timer only run on the leader?

Example

>>> @app.timer(interval=10.0)
>>> async def every_10_seconds():
...     print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True)
>>> async def every_5_seconds():
...     print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
crontab(cron_format: str, *, timezone: Optional[tzinfo] = None, on_leader: bool = False, traced: bool = True) Callable[source]

Define periodic task using Crontab description.

This is an async def function to be run at the fixed times, defined by the Cron format.

Like timer(), but executes at fixed times instead of executing at certain intervals.

This decorator takes an async function and adds it to a list of Cronjobs started with the app.

Parameters:

cron_format (str) – The Cron spec defining fixed times to run the decorated function.

Keyword Arguments:
  • timezone – The timezone to be taken into account for the Cron jobs. If not set value from timezone will be taken.

  • on_leader – Should the Cron job only run on the leader?

Example

>>> @app.crontab(cron_format='30 18 * * *',
                 timezone=pytz.timezone('US/Pacific'))
>>> async def every_6_30_pm_pacific():
...     print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True)
>>> async def every_6_30_pm():
...     print('6:30pm UTC; ALSO, I AM THE LEADER!')
Return type:

_CallableType

service(cls: Type[ServiceT]) Type[ServiceT][source]

Decorate mode.Service to be started with the app.

Examples

from mode import Service

@app.service
class Foo(Service):
    ...
Return type:

_GenericAlias[ServiceT]

is_leader() bool[source]

Return True if we are in leader worker process.

Return type:

bool

stream(channel: Union[AsyncIterable, Iterable], beacon: Optional[NodeT] = None, **kwargs: Any) StreamT[source]

Create new stream from channel/topic/iterable/async iterable.

Parameters:
  • channel (_UnionGenericAlias[_SpecialGenericAlias, _SpecialGenericAlias]) – Iterable to stream over (async or non-async).

  • kwargs (Any) – See Stream.

Return type:

StreamT

Returns:

to iterate over events in the stream.

Return type:

faust.Stream

Table(name: str, *, default: Optional[Callable[[], Any]] = None, window: Optional[WindowT] = None, partitions: Optional[int] = None, help: Optional[str] = None, **kwargs: Any) TableT[source]

Define new table.

Parameters:
  • name (str) – Name used for table, note that two tables living in the same application cannot have the same name.

  • default (_UnionGenericAlias[_CallableGenericAlias[Any], None]) – A callable, or type that will return a default value for keys missing in this table.

  • window (_UnionGenericAlias[WindowT, None]) – A windowing strategy to wrap this window in.

Examples

>>> table = app.Table('user_to_amount', default=int)
>>> table['George']
0
>>> table['Elaine'] += 1
>>> table['Elaine'] += 1
>>> table['Elaine']
2
Return type:

TableT

GlobalTable(name: str, *, default: Optional[Callable[[], Any]] = None, window: Optional[WindowT] = None, partitions: Optional[int] = None, help: Optional[str] = None, synchronize_all_active_partitions: Optional[bool] = False, **kwargs: Any) GlobalTableT[source]

Define new global table.

Parameters:
  • name (str) – Name used for global table, note that two global tables living in the same application cannot have the same name.

  • default (_UnionGenericAlias[_CallableGenericAlias[Any], None]) – A callable, or type that will return a default valu for keys missing in this global table.

  • window (_UnionGenericAlias[WindowT, None]) – A windowing strategy to wrap this window in.

Examples

>>> gtable = app.GlobalTable('user_to_amount', default=int)
>>> gtable['George']
0
>>> gtable['Elaine'] += 1
>>> gtable['Elaine'] += 1
>>> gtable['Elaine']
2
Return type:

GlobalTableT

SetTable(name: str, *, window: Optional[WindowT] = None, partitions: Optional[int] = None, start_manager: bool = False, help: Optional[str] = None, **kwargs: Any) TableT[source]

Table of sets.

Return type:

TableT

SetGlobalTable(name: str, *, window: Optional[WindowT] = None, partitions: Optional[int] = None, start_manager: bool = False, help: Optional[str] = None, **kwargs: Any) TableT[source]

Table of sets (global).

Return type:

TableT

page(path: str, *, base: ~typing.Type[~faust.web.views.View] = <class 'faust.web.views.View'>, cors_options: ~typing.Optional[~typing.Mapping[str, ~faust.types.web.ResourceOptions]] = None, name: ~typing.Optional[str] = None) Callable[[Union[Type[View], Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Type[View]][source]

Decorate view to be included in the web server.

Return type:

_CallableGenericAlias[_UnionGenericAlias[_GenericAlias[View], _CallableGenericAlias[View, Request, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]], _CallableGenericAlias[View, Request, Any, Any, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]]], _GenericAlias[View]]

table_route(table: CollectionT, shard_param: Optional[str] = None, *, query_param: Optional[str] = None, match_info: Optional[str] = None, exact_key: Optional[str] = None) Callable[[Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]][source]

Decorate view method to route request to table key destination.

Return type:

_CallableGenericAlias[_UnionGenericAlias[_CallableGenericAlias[View, Request, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]], _CallableGenericAlias[View, Request, Any, Any, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]]], _UnionGenericAlias[_CallableGenericAlias[View, Request, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]], _CallableGenericAlias[View, Request, Any, Any, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]]]]

topic_route(topic: CollectionT, shard_param: Optional[str] = None, *, query_param: Optional[str] = None, match_info: Optional[str] = None, exact_key: Optional[str] = None) Callable[[Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]][source]

Decorate view method to route request to a topic partition destination.

Return type:

_CallableGenericAlias[_UnionGenericAlias[_CallableGenericAlias[View, Request, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]], _CallableGenericAlias[View, Request, Any, Any, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]]], _UnionGenericAlias[_CallableGenericAlias[View, Request, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]], _CallableGenericAlias[View, Request, Any, Any, _UnionGenericAlias[_GenericAlias[Any, Any, Response], _GenericAlias[Response]]]]]

command(*options: Any, base: Optional[Type[_AppCommand]] = None, **kwargs: Any) Callable[[Callable], Type[_AppCommand]][source]

Decorate async def function to be used as CLI command.

Return type:

_CallableGenericAlias[_CallableType, _GenericAlias[_AppCommand]]

create_event(key: Optional[Union[bytes, _ModelT, Any]], value: Union[bytes, _ModelT, Any], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], message: Message) EventT[source]

Create new faust.Event object.

Return type:

EventT

async start_client() None[source]

Start the app in Client-Only mode necessary for RPC requests.

Notes

Once started as a client the app cannot be restarted as Server.

Return type:

None

async maybe_start_client() None[source]

Start the app in Client-Only mode if not started as Server.

Return type:

None

trace(name: str, trace_enabled: bool = True, **extra_context: Any) ContextManager[source]

Return new trace context to trace operation using OpenTracing.

Return type:

_SpecialGenericAlias

traced(fun: Callable, name: Optional[str] = None, sample_rate: float = 1.0, **context: Any) Callable[source]

Decorate function to be traced using the OpenTracing API.

Return type:

_CallableType

async send(channel: Union[ChannelT, str], key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None) Awaitable[RecordMetadata][source]

Send event to channel/topic.

Parameters:
  • channel (_UnionGenericAlias[ChannelT, str]) – Channel/topic or the name of a topic to send event to.

  • key (_UnionGenericAlias[bytes, _ModelT, Any, None]) – Message key.

  • value (_UnionGenericAlias[bytes, _ModelT, Any, None]) – Message value.

  • partition (_UnionGenericAlias[int, None]) – Specific partition to send to. If not set the partition will be chosen by the partitioner.

  • timestamp (_UnionGenericAlias[float, None]) – Epoch seconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

  • headers (_UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]) – Mapping of key/value pairs, or iterable of key value pairs to use as headers for the message.

  • schema (_UnionGenericAlias[SchemaT, None]) – Schema to use for serialization.

  • key_serializer (_UnionGenericAlias[CodecT, str, None]) – Serializer to use (if value is not model). Overrides schema if one is specified.

  • value_serializer (_UnionGenericAlias[CodecT, str, None]) – Serializer to use (if value is not model). Overrides schema if one is specified.

  • callback (_UnionGenericAlias[_CallableGenericAlias[FutureMessage, _UnionGenericAlias[None, _GenericAlias[None]]], None]) –

    Called after the message is fully delivered to the channel, but not to the consumer. Signature must be unary as the FutureMessage future is passed to it.

    The resulting faust.types.tuples.RecordMetadata object is then available as fut.result().

Return type:

_GenericAlias[RecordMetadata]

in_transaction[source]

Return True if stream is using transactions.

LiveCheck(**kwargs: Any) _LiveCheck[source]

Return new LiveCheck instance testing features for this app.

Return type:

_LiveCheck

maybe_start_producer() ProducerT[source]

Ensure producer is started.

Return type:

ProducerT

async commit(topics: AbstractSet[Union[str, TP]]) bool[source]

Commit offset for acked messages in specified topics’.

Warning

This will commit acked messages in all topics if the topics argument is passed in as None.

Return type:

bool

async on_stop() None[source]

Call when application stops.

Tip

Remember to call super if you override this method.

Return type:

None

on_rebalance_start() None[source]

Call when rebalancing starts.

Return type:

None

on_rebalance_return() None[source]
Return type:

None

on_rebalance_end() None[source]

Call when rebalancing is done.

Return type:

None

FlowControlQueue(maxsize: Optional[int] = None, *, clear_on_resume: bool = False) ThrowableQueue[source]

Like asyncio.Queue, but can be suspended/resumed.

Return type:

ThrowableQueue

Worker(**kwargs: Any) _Worker[source]

Return application worker instance.

Return type:

_Worker

on_webserver_init(web: Web) None[source]

Call when the Web server is initializing.

Return type:

None

property conf: Settings

Application configuration. :rtype: Settings

property producer: ProducerT

Message producer. :rtype: ProducerT

property consumer: ConsumerT

Message consumer. :rtype: ConsumerT

logger: logging.Logger = <Logger faust.app.base (WARNING)>
property transport: TransportT

Consumer message transport. :rtype: TransportT

property producer_transport: TransportT

Producer message transport. :rtype: TransportT

property cache: CacheBackendT

Cache backend. :rtype: CacheBackendT

tables[source]

Map of available tables, and the table manager service.

topics[source]

Topic Conductor.

This is the mediator that moves messages fetched by the Consumer into the streams.

It’s also a set of registered topics by string topic name, so you can check if a topic is being consumed from by doing topic in app.topics.

property monitor: Monitor

Monitor keeps stats about what’s going on inside the worker. :rtype: Monitor

flow_control[source]

Flow control of streams.

This object controls flow into stream queues, and can also clear all buffers.

property http_client: ClientSession

HTTP client Session. :rtype: ClientSession

assignor[source]

Partition Assignor.

Responsible for partition assignment.

router[source]

Find the node partitioned data belongs to.

The router helps us route web requests to the wanted Faust node. If a topic is sharded by account_id, the router can send us to the Faust worker responsible for any account. Used by the @app.table_route decorator.

web[source]

Web driver.

serializers[source]

Return serializer registry.

property label: str

Return human readable description of application. :rtype: str

property shortlabel: str

Return short description of application. :rtype: str

class faust.Channel(app: AppT, *, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, is_iterator: bool = False, queue: Optional[ThrowableQueue] = None, maxsize: Optional[int] = None, root: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, loop: Optional[AbstractEventLoop] = None)[source]

Create new channel.

Parameters:
  • app (AppT) – The app that created this channel (app.channel())

  • schema (_UnionGenericAlias[SchemaT, None]) – Schema used for serialization/deserialization

  • key_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – The Model used for keys in this channel. (overrides schema if one is defined)

  • value_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – The Model used for values in this channel. (overrides schema if one is defined)

  • maxsize (_UnionGenericAlias[int, None]) – The maximum number of messages this channel can hold. If exceeded any new put call will block until a message is removed from the channel.

  • is_iterator (bool) – When streams iterate over a channel they will call stream.clone(is_iterator=True) so this attribute denotes that this channel instance is currently being iterated over.

  • active_partitions (_UnionGenericAlias[_GenericAlias[TP], None]) – Set of active topic partitions this channel instance is assigned to.

  • loop (_UnionGenericAlias[AbstractEventLoop, None]) – The asyncio event loop to use.

app: AppT
is_iterator: bool
schema: SchemaT
key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]]
value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]]
property queue: ThrowableQueue

Return the underlying queue/buffer backing this channel. :rtype: ThrowableQueue

clone(*, is_iterator: Optional[bool] = None, **kwargs: Any) ChannelT[T][source]

Create clone of this channel.

Parameters:

is_iterator (_UnionGenericAlias[bool, None]) – Set to True if this is now a channel that is being iterated over.

Keyword Arguments:

**kwargs – Any keyword arguments passed will override any of the arguments supported by Channel.__init__.

Return type:

ChannelT

clone_using_queue(queue: Queue) ChannelT[T][source]

Create clone of this channel using specific queue instance.

Return type:

ChannelT

stream(**kwargs: Any) StreamT[T][source]

Create stream reading from this channel.

Return type:

StreamT

get_topic_name() str[source]

Get the topic name, or raise if this is not a named channel.

Return type:

str

async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send message to channel.

Return type:

_GenericAlias[RecordMetadata]

send_soon(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False, eager_partitioning: bool = False) FutureMessage[source]

Produce message by adding to buffer.

This method is only supported by Topic.

Raises:

NotImplementedError – always for in-memory channel.

Return type:

FutureMessage

as_future_message(key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, eager_partitioning: bool = False) FutureMessage[source]

Create promise that message will be transmitted.

Return type:

FutureMessage

prepare_headers(headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]]) Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]][source]

Prepare headers passed before publishing.

Return type:

_UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]

async publish_message(fut: FutureMessage, wait: bool = True) Awaitable[RecordMetadata][source]

Publish message to channel.

This is the interface used by topic.send(), etc. to actually publish the message on the channel after being buffered up or similar.

It takes a FutureMessage object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.

Return type:

_GenericAlias[RecordMetadata]

maybe_declare() None[source]

Declare/create this channel, but only if it doesn’t exist.

Return type:

None

async declare() None[source]

Declare/create this channel.

This is used to create this channel on a server, if that is required to operate it.

Return type:

None

prepare_key(key: Optional[Union[bytes, _ModelT, Any]], key_serializer: Optional[Union[CodecT, str]], schema: Optional[SchemaT] = None, headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]] = None) Tuple[Any, Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]][source]

Prepare key before it is sent to this channel.

Topic uses this to implement serialization of keys sent to the channel.

Return type:

_GenericAlias[Any, _UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]]

prepare_value(value: Union[bytes, _ModelT, Any], value_serializer: Optional[Union[CodecT, str]], schema: Optional[SchemaT] = None, headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]] = None) Tuple[Any, Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]][source]

Prepare value before it is sent to this channel.

Topic uses this to implement serialization of values sent to the channel.

Return type:

_GenericAlias[Any, _UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]]

async decode(message: Message, *, propagate: bool = False) EventT[T][source]

Decode Message into Event.

Return type:

EventT

async deliver(message: Message) None[source]

Deliver message to queue from consumer.

This is called by the consumer to deliver the message to the channel.

Return type:

None

async put(value: EventT[T_contra]) None[source]

Put event onto this channel.

Return type:

None

async get(*, timeout: Optional[Union[timedelta, float, str]] = None) EventT[T][source]

Get the next Event received on this channel.

Return type:

EventT

empty() bool[source]

Return True if the queue is empty.

Return type:

bool

async on_key_decode_error(exc: Exception, message: Message) None[source]

Unable to decode the key of an item in the queue.

Return type:

None

async on_value_decode_error(exc: Exception, message: Message) None[source]

Unable to decode the value of an item in the queue.

Return type:

None

async on_decode_error(exc: Exception, message: Message) None[source]

Signal that there was an error reading an event in the queue.

When a message in the channel needs deserialization to be reconstructed back to its original form, we will sometimes see decoding/deserialization errors being raised, from missing fields or malformed payloads, and so on.

We will log the exception, but you can also override this to perform additional actions.

Admonition: Kafka

In the event a deserialization error occurs, we HAVE to commit the offset of the source message to continue processing the stream.

For this reason it is important that you keep a close eye on error logs. For easy of use, we suggest using log aggregation software, such as Sentry, to surface these errors to your operations team.

Return type:

None

on_stop_iteration() None[source]

Signal that iteration over this channel was stopped.

Tip

Remember to call super when overriding this method.

Return type:

None

derive(**kwargs: Any) ChannelT[T][source]

Derive new channel from this channel, using new configuration.

See faust.Topic.derive.

For local channels this will simply return the same channel.

Return type:

ChannelT

async throw(exc: BaseException) None[source]

Throw exception to be received by channel subscribers.

Tip

When you find yourself having to call this from a regular, non-async def function, you can use _throw() instead.

Return type:

None

property subscriber_count: int

Return number of active subscribers to local channel. :rtype: int

property label: str

Short textual description of channel. :rtype: str

class faust.ChannelT(app: _AppT, *, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, is_iterator: bool = False, queue: Optional[ThrowableQueue] = None, maxsize: Optional[int] = None, root: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, loop: Optional[AbstractEventLoop] = None)[source]
app: _AppT
schema: _SchemaT
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
loop: Optional[AbstractEventLoop]
maxsize: Optional[int]
active_partitions: Optional[Set[TP]]
abstract clone(*, is_iterator: Optional[bool] = None, **kwargs: Any) ChannelT[_T][source]
Return type:

ChannelT

abstract clone_using_queue(queue: Queue) ChannelT[_T][source]
Return type:

ChannelT

abstract stream(**kwargs: Any) _StreamT[_T][source]
abstract get_topic_name() str[source]
Return type:

str

abstract async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[_SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]
Return type:

_GenericAlias[RecordMetadata]

abstract send_soon(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[_SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False, eager_partitioning: bool = False) FutureMessage[source]
Return type:

FutureMessage

abstract as_future_message(key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[_SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, eager_partitioning: bool = False) FutureMessage[source]
Return type:

FutureMessage

abstract async publish_message(fut: FutureMessage, wait: bool = True) Awaitable[RecordMetadata][source]
Return type:

_GenericAlias[RecordMetadata]

maybe_declare() None[source]
Return type:

None

abstract async declare() None[source]
Return type:

None

abstract prepare_key(key: Optional[Union[bytes, _ModelT, Any]], key_serializer: Optional[Union[CodecT, str]], schema: Optional[_SchemaT] = None) Any[source]
Return type:

Any

abstract prepare_value(value: Union[bytes, _ModelT, Any], value_serializer: Optional[Union[CodecT, str]], schema: Optional[_SchemaT] = None) Any[source]
Return type:

Any

abstract async decode(message: Message, *, propagate: bool = False) _EventT[_T][source]
Return type:

_GenericAlias[~_T]

abstract async deliver(message: Message) None[source]
Return type:

None

abstract async put(value: _EventT[_T]) None[source]
Return type:

None

abstract async get(*, timeout: Optional[Union[timedelta, float, str]] = None) _EventT[_T][source]
Return type:

_GenericAlias[~_T]

abstract empty() bool[source]
Return type:

bool

abstract async on_key_decode_error(exc: Exception, message: Message) None[source]
Return type:

None

abstract async on_value_decode_error(exc: Exception, message: Message) None[source]
Return type:

None

abstract async on_decode_error(exc: Exception, message: Message) None[source]
Return type:

None

abstract on_stop_iteration() None[source]
Return type:

None

abstract async throw(exc: BaseException) None[source]
Return type:

None

abstract derive(**kwargs: Any) ChannelT[source]
Return type:

ChannelT

abstract property subscriber_count: int
Return type:

int

abstract property queue: ThrowableQueue
Return type:

ThrowableQueue

class faust.Event(app: AppT, key: Optional[Union[bytes, _ModelT, Any]], value: Union[bytes, _ModelT, Any], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], message: Message)[source]

An event received on a channel.

Notes

  • Events have a key and a value:

    event.key, event.value
    
  • They also have a reference to the original message (if available), such as a Kafka record:

    event.message.offset

  • Iterating over channels/topics yields Event:

    async for event in channel:

  • Iterating over a stream (that in turn iterate over channel) yields Event.value:

    async for value in channel.stream()  # value is event.value
        ...
    
  • If you only have a Stream object, you can also access underlying events by using Stream.events.

    For example:

    async for event in channel.stream.events():
        ...
    

    Also commonly used for finding the “current event” related to a value in the stream:

    stream = channel.stream()
    async for event in stream.events():
        event = stream.current_event
        message = event.message
        topic = event.message.topic
    

    You can retrieve the current event in a stream to:

    • Get access to the serialized key+value.

    • Get access to message properties like, what topic+partition the value was received on, or its offset.

    If you want access to both key and value, you should use stream.items() instead.

    async for key, value in stream.items():
        ...
    

    stream.current_event can also be accessed but you must take extreme care you are using the correct stream object. Methods such as .group_by(key) and .through(topic) returns cloned stream objects, so in the example:

    The best way to access the current_event in an agent is to use the ContextVar:

    from faust import current_event
    
    @app.agent(topic)
    async def process(stream):
        async for value in stream:
            event = current_event()
    
app: AppT
key: K
value: V
message: Message
headers: Mapping
acked: bool
async send(channel: ~typing.Union[str, ~faust.types.channels.ChannelT], key: ~typing.Optional[~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any]] = <object object>, value: ~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any] = <object object>, partition: ~typing.Optional[int] = None, timestamp: ~typing.Optional[float] = None, headers: ~typing.Any = <object object>, schema: ~typing.Optional[~faust.types.serializers.SchemaT] = None, key_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, value_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, callback: ~typing.Optional[~typing.Callable[[~faust.types.tuples.FutureMessage], ~typing.Union[None, ~typing.Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send object to channel.

Return type:

_GenericAlias[RecordMetadata]

async forward(channel: ~typing.Union[str, ~faust.types.channels.ChannelT], key: ~typing.Optional[~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any]] = <object object>, value: ~typing.Union[bytes, ~faust.types.core._ModelT, ~typing.Any] = <object object>, partition: ~typing.Optional[int] = None, timestamp: ~typing.Optional[float] = None, headers: ~typing.Any = <object object>, schema: ~typing.Optional[~faust.types.serializers.SchemaT] = None, key_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, value_serializer: ~typing.Optional[~typing.Union[~faust.types.codecs.CodecT, str]] = None, callback: ~typing.Optional[~typing.Callable[[~faust.types.tuples.FutureMessage], ~typing.Union[None, ~typing.Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Forward original message (will not be reserialized).

Return type:

_GenericAlias[RecordMetadata]

ack() bool[source]

Acknowledge event as being processed by stream.

When the last stream processor acks the message, the offset in the source topic will be marked as safe-to-commit, and the worker will commit and advance the committed offset.

Return type:

bool

class faust.EventT(app: _AppT, key: Optional[Union[bytes, _ModelT, Any]], value: Union[bytes, _ModelT, Any], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], message: Message)[source]
app: _AppT
key: Optional[Union[bytes, _ModelT, Any]]
value: Union[bytes, _ModelT, Any]
headers: Mapping
message: Message
acked: bool
abstract async send(channel: Union[str, _ChannelT], key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[_SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]
Return type:

_GenericAlias[RecordMetadata]

abstract async forward(channel: Union[str, _ChannelT], key: Optional[Any] = None, value: Optional[Any] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[_SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]
Return type:

_GenericAlias[RecordMetadata]

abstract ack() bool[source]
Return type:

bool

class faust.ModelOptions[source]
serializer: Optional[Union[CodecT, str]] = None
namespace: str
include_metadata: bool = True
polymorphic_fields: bool = False
allow_blessed_key: bool = False
isodates: bool = False
decimals: bool = False
validation: bool = False
coerce: bool = False
coercions: MutableMapping[Union[Type, Tuple[Type, ...]], Callable[[Any], Any]] = None
date_parser: Optional[Callable[[Any], datetime]] = None
fields: Mapping[str, Type] = None

Flattened view of __annotations__ in MRO order.

Type:

Index

fieldset: FrozenSet[str] = None

Set of required field names, for fast argument checking.

Type:

Index

descriptors: Mapping[str, FieldDescriptorT] = None

Mapping of field name to field descriptor.

Type:

Index

fieldpos: Mapping[int, str] = None

Positional argument index to field name. Used by Record.__init__ to map positional arguments to fields.

Type:

Index

optionalset: FrozenSet[str] = None

Set of optional field names, for fast argument checking.

Type:

Index

defaults: Mapping[str, Any] = None

Mapping of field names to default value.

tagged_fields: FrozenSet[str] = None
personal_fields: FrozenSet[str] = None
sensitive_fields: FrozenSet[str] = None
secret_fields: FrozenSet[str] = None
has_tagged_fields: bool = False
has_personal_fields: bool = False
has_sensitive_fields: bool = False
has_secret_fields: bool = False
clone_defaults() ModelOptions[source]
Return type:

ModelOptions

class faust.Record[source]

Describes a model type that is a record (Mapping).

Examples

>>> class LogEvent(Record, serializer='json'):
...     severity: str
...     message: str
...     timestamp: float
...     optional_field: str = 'default value'
>>> event = LogEvent(
...     severity='error',
...     message='Broken pact',
...     timestamp=666.0,
... )
>>> event.severity
'error'
>>> serialized = event.dumps()
'{"severity": "error", "message": "Broken pact", "timestamp": 666.0}'
>>> restored = LogEvent.loads(serialized)
<LogEvent: severity='error', message='Broken pact', timestamp=666.0>
>>> # You can also subclass a Record to create a new record
>>> # with additional fields
>>> class RemoteLogEvent(LogEvent):
...     url: str
>>> # You can also refer to record fields and pass them around:
>>> LogEvent.severity
>>> <FieldDescriptor: LogEvent.severity (str)>
classmethod from_data(data: Mapping, *, preferred_type: Optional[Type[ModelT]] = None) Record[source]

Create model object from Python dictionary.

Return type:

Record

to_representation() Mapping[str, Any][source]

Convert model to its Python generic counterpart.

Records will be converted to dictionary.

Return type:

_GenericAlias[str, Any]

asdict() Dict[str, Any][source]

Convert record to Python dictionary.

Return type:

_GenericAlias[str, Any]

class faust.Monitor(*, max_avg_history: ~typing.Optional[int] = None, max_commit_latency_history: ~typing.Optional[int] = None, max_send_latency_history: ~typing.Optional[int] = None, max_assignment_latency_history: ~typing.Optional[int] = None, messages_sent: int = 0, tables: ~typing.Optional[~typing.MutableMapping[str, ~faust.sensors.monitor.TableState]] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: ~typing.Optional[~typing.Counter[str]] = None, events_total: int = 0, events_by_stream: ~typing.Optional[~typing.Counter[~faust.types.streams.StreamT]] = None, events_by_task: ~typing.Optional[~typing.Counter[~_asyncio.Task]] = None, events_runtime: ~typing.Optional[~typing.Deque[float]] = None, commit_latency: ~typing.Optional[~typing.Deque[float]] = None, send_latency: ~typing.Optional[~typing.Deque[float]] = None, assignment_latency: ~typing.Optional[~typing.Deque[float]] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: ~typing.Optional[~typing.Counter[~faust.types.tuples.TP]] = None, rebalances: ~typing.Optional[int] = None, rebalance_return_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_end_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: ~typing.Callable[[], float] = <built-in function monotonic>, http_response_codes: ~typing.Optional[~typing.Counter[~http.HTTPStatus]] = None, http_response_latency: ~typing.Optional[~typing.Deque[float]] = None, http_response_latency_avg: float = 0.0, **kwargs: ~typing.Any)[source]

Default Faust Sensor.

This is the default sensor, recording statistics about events, etc.

send_errors = 0

Number of produce operations that ended in error.

assignments_completed = 0

Number of partition assignments completed.

assignments_failed = 0

Number of partitions assignments that failed.

max_avg_history: int = 100

Max number of total run time values to keep to build average.

max_commit_latency_history: int = 30

Max number of commit latency numbers to keep.

max_send_latency_history: int = 30

Max number of send latency numbers to keep.

max_assignment_latency_history: int = 30

Max number of assignment latency numbers to keep.

rebalances = 0

Number of rebalances seen by this worker.

tables: MutableMapping[str, TableState] = None

Mapping of tables

commit_latency: Deque[float] = None

Deque of commit latency values

send_latency: Deque[float] = None

Deque of send latency values

assignment_latency: Deque[float] = None

Deque of assignment latency values.

rebalance_return_latency: Deque[float] = None

Deque of previous n rebalance return latencies.

rebalance_end_latency: Deque[float] = None

Deque of previous n rebalance end latencies.

rebalance_return_avg: float = 0.0

Average rebalance return latency.

rebalance_end_avg: float = 0.0

Average rebalance end latency.

messages_active: int = 0

Number of messages currently being processed.

messages_received_total: int = 0

Number of messages processed in total.

messages_received_by_topic: Counter[str] = None

Count of messages received by topic

messages_sent: int = 0

Number of messages sent in total.

messages_sent_by_topic: Counter[str] = None

Number of messages sent by topic.

messages_s: int = 0

Number of messages being processed this second.

events_active: int = 0

Number of events currently being processed.

events_total: int = 0

Number of events processed in total.

events_by_task: Counter[str] = None

Count of events processed by task

events_by_stream: Counter[str] = None

Count of events processed by stream

events_s: int = 0

Number of events being processed this second.

events_runtime_avg: float = 0.0

Average event runtime over the last second.

events_runtime: Deque[float] = None

Deque of run times used for averages

topic_buffer_full: Counter[TP] = None

Counter of times a topics buffer was full

http_response_codes: Counter[HTTPStatus] = None

Counter of returned HTTP status codes.

http_response_latency: Deque[float] = None

Deque of previous n HTTP request->response latencies.

http_response_latency_avg: float = 0.0

Average request->response latency.

metric_counts: Counter[str] = None

Arbitrary counts added by apps

tp_committed_offsets: MutableMapping[TP, int] = None

Last committed offsets by TopicPartition

tp_read_offsets: MutableMapping[TP, int] = None

Last read offsets by TopicPartition

tp_end_offsets: MutableMapping[TP, int] = None

Log end offsets by TopicPartition

stream_inbound_time: Dict[TP, float] = None
stream_lookup: MutableMapping[StreamT, str] = None
task_lookup: MutableMapping[Optional[Task], str] = None
secs_since(start_time: float) float[source]

Given timestamp start, return number of seconds since that time.

Return type:

float

ms_since(start_time: float) float[source]

Given timestamp start, return number of ms since that time.

Return type:

float

logger: logging.Logger = <Logger faust.sensors.monitor (WARNING)>
secs_to_ms(timestamp: float) float[source]

Convert seconds to milliseconds.

Return type:

float

asdict() Mapping[source]

Return monitor state as dictionary.

Return type:

_SpecialGenericAlias

on_message_in(tp: TP, offset: int, message: Message) None[source]

Call before message is delegated to streams.

Return type:

None

on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict][source]

Call when stream starts processing an event.

Return type:

_UnionGenericAlias[_SpecialGenericAlias, None]

on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None[source]

Call when stream is done processing an event.

Return type:

None

on_topic_buffer_full(tp: TP) None[source]

Call when conductor topic buffer is full and has to wait.

Return type:

None

on_message_out(tp: TP, offset: int, message: Message) None[source]

Call when message is fully acknowledged and can be committed.

Return type:

None

on_table_get(table: CollectionT, key: Any) None[source]

Call when value in table is retrieved.

Return type:

None

on_table_set(table: CollectionT, key: Any, value: Any) None[source]

Call when new value for key in table is set.

Return type:

None

on_table_del(table: CollectionT, key: Any) None[source]

Call when key in a table is deleted.

Return type:

None

on_commit_initiated(consumer: ConsumerT) Any[source]

Consumer is about to commit topic offset.

Return type:

Any

on_commit_completed(consumer: ConsumerT, state: Any) None[source]

Call when consumer commit offset operation completed.

Return type:

None

on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any[source]

Call when message added to producer buffer.

Return type:

Any

on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None[source]

Call when producer finished sending message.

Return type:

None

on_send_error(producer: ProducerT, exc: BaseException, state: Any) None[source]

Call when producer was unable to publish message.

Return type:

None

count(metric_name: str, count: int = 1) None[source]

Count metric by name.

Return type:

None

on_tp_commit(tp_offsets: MutableMapping[TP, int]) None[source]

Call when offset in topic partition is committed.

Return type:

None

track_tp_end_offset(tp: TP, offset: int) None[source]

Track new topic partition end offset for monitoring lags.

Return type:

None

on_assignment_start(assignor: PartitionAssignorT) Dict[source]

Partition assignor is starting to assign partitions.

Return type:

_SpecialGenericAlias

on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None[source]

Partition assignor did not complete assignor due to error.

Return type:

None

on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None[source]

Partition assignor completed assignment.

Return type:

None

on_rebalance_start(app: AppT) Dict[source]

Cluster rebalance in progress.

Return type:

_SpecialGenericAlias

on_rebalance_return(app: AppT, state: Dict) None[source]

Consumer replied assignment is done to broker.

Return type:

None

on_rebalance_end(app: AppT, state: Dict) None[source]

Cluster rebalance fully completed (including recovery).

Return type:

None

on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict[source]

Web server started working on request.

Return type:

_SpecialGenericAlias

on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None[source]

Web server finished working on request.

Return type:

None

on_threaded_producer_buffer_processed(app: AppT, size: int) None[source]
Return type:

None

class faust.Sensor(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]

Base class for sensors.

This sensor does not do anything at all, but can be subclassed to create new monitors.

on_message_in(tp: TP, offset: int, message: Message) None[source]

Message received by a consumer.

Return type:

None

on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict][source]

Message sent to a stream as an event.

Return type:

_UnionGenericAlias[_SpecialGenericAlias, None]

on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None[source]

Event was acknowledged by stream.

Notes

Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through on_message_out() just before offsets are committed.

Return type:

None

on_message_out(tp: TP, offset: int, message: Message) None[source]

All streams finished processing message.

Return type:

None

on_topic_buffer_full(tp: TP) None[source]

Topic buffer full so conductor had to wait.

Return type:

None

on_table_get(table: CollectionT, key: Any) None[source]

Key retrieved from table.

Return type:

None

on_table_set(table: CollectionT, key: Any, value: Any) None[source]

Value set for key in table.

Return type:

None

on_table_del(table: CollectionT, key: Any) None[source]

Key deleted from table.

Return type:

None

on_commit_initiated(consumer: ConsumerT) Any[source]

Consumer is about to commit topic offset.

Return type:

Any

on_commit_completed(consumer: ConsumerT, state: Any) None[source]

Consumer finished committing topic offset.

Return type:

None

on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any[source]

About to send a message.

Return type:

Any

on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None[source]

Message successfully sent.

Return type:

None

on_send_error(producer: ProducerT, exc: BaseException, state: Any) None[source]

Error while sending message.

Return type:

None

on_assignment_start(assignor: PartitionAssignorT) Dict[source]

Partition assignor is starting to assign partitions.

Return type:

_SpecialGenericAlias

on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None[source]

Partition assignor did not complete assignor due to error.

Return type:

None

on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None[source]

Partition assignor completed assignment.

Return type:

None

on_rebalance_start(app: AppT) Dict[source]

Cluster rebalance in progress.

Return type:

_SpecialGenericAlias

on_rebalance_return(app: AppT, state: Dict) None[source]

Consumer replied assignment is done to broker.

Return type:

None

on_rebalance_end(app: AppT, state: Dict) None[source]

Cluster rebalance fully completed (including recovery).

Return type:

None

on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict[source]

Web server started working on request.

Return type:

_SpecialGenericAlias

on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None[source]

Web server finished working on request.

Return type:

None

on_threaded_producer_buffer_processed(app: AppT, size: int) None[source]
Return type:

None

asdict() Mapping[source]

Convert sensor state to dictionary.

Return type:

_SpecialGenericAlias

logger: logging.Logger = <Logger faust.sensors.base (WARNING)>
class faust.Codec(children: Optional[Tuple[CodecT, ...]] = None, **kwargs: Any)[source]

Base class for codecs.

children: Tuple[CodecT, ...]

next steps in the recursive codec chain. x = pickle | binary returns codec with children set to (pickle, binary).

nodes: Tuple[CodecT, ...]

cached version of children including this codec as the first node. could use chain below, but seems premature so just copying the list.

kwargs: Dict

subclasses can support keyword arguments, the base implementation of clone() uses this to preserve keyword arguments in copies.

dumps(obj: Any) bytes[source]

Encode object obj.

Return type:

bytes

loads(s: bytes) Any[source]

Decode object from string.

Return type:

Any

clone(*children: CodecT) CodecT[source]

Create a clone of this codec, with optional children added.

Return type:

CodecT

class faust.Schema(*, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, allow_empty: Optional[bool] = None)[source]
update(*, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, allow_empty: Optional[bool] = None) None[source]
Return type:

None

loads_key(app: AppT, message: Message, *, loads: Optional[Callable] = None, serializer: Optional[Union[CodecT, str]] = None) KT[source]
Return type:

~KT

loads_value(app: AppT, message: Message, *, loads: Optional[Callable] = None, serializer: Optional[Union[CodecT, str]] = None) VT[source]
Return type:

~VT

dumps_key(app: AppT, key: Optional[Union[bytes, _ModelT, Any]], *, serializer: Optional[Union[CodecT, str]] = None, headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]) Tuple[Any, Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]][source]
Return type:

_GenericAlias[Any, _UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]]

dumps_value(app: AppT, value: Union[bytes, _ModelT, Any], *, serializer: Optional[Union[CodecT, str]] = None, headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]) Tuple[Any, Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]][source]
Return type:

_GenericAlias[Any, _UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]]

on_dumps_key_prepare_headers(key: Union[bytes, _ModelT, Any], headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]) Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]][source]
Return type:

_UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]

on_dumps_value_prepare_headers(value: Union[bytes, _ModelT, Any], headers: Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]]) Optional[Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]][source]
Return type:

_UnionGenericAlias[_GenericAlias[_GenericAlias[str, bytes]], _GenericAlias[str, bytes], None]

async decode(app: AppT, message: Message, *, propagate: bool = False) EventT[source]

Decode message from topic (compiled function not cached).

Return type:

EventT

compile(app: ~faust.types.app.AppT, *, on_key_decode_error: ~typing.Callable[[Exception, ~faust.types.tuples.Message], ~typing.Awaitable[None]] = <function _noop_decode_error>, on_value_decode_error: ~typing.Callable[[Exception, ~faust.types.tuples.Message], ~typing.Awaitable[None]] = <function _noop_decode_error>, default_propagate: bool = False) Callable[[...], Awaitable[EventT]][source]

Compile function used to decode event.

Return type:

_CallableGenericAlias[…, _GenericAlias[EventT]]

class faust.Stream(channel: AsyncIterator[T_co], *, app: AppT, processors: Optional[Iterable[Callable[[T], Union[T, Awaitable[T]]]]] = None, combined: Optional[List[JoinableT]] = None, on_start: Optional[Callable] = None, join_strategy: Optional[JoinT] = None, beacon: Optional[NodeT] = None, concurrency_index: Optional[int] = None, prev: Optional[StreamT] = None, active_partitions: Optional[Set[TP]] = None, enable_acks: bool = True, prefix: str = '', loop: Optional[AbstractEventLoop] = None)[source]

A stream: async iterator processing events in channels/topics.

logger: logging.Logger = <Logger faust.streams (WARNING)>
mundane_level = 'debug'

The log level for mundane info such as starting, stopping, etc. Set this to "debug" for less information.

events_total: int = 0

Number of events processed by this instance so far.

get_active_stream() StreamT[source]

Return the currently active stream.

A stream can be derived using Stream.group_by etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:

>>> @app.agent()
... async def agent(a):
..      a = a
...     b = a.group_by(Withdrawal.account_id)
...     c = b.through('backup_topic')
...     async for value in c:
...         ...

The return value of a.get_active_stream() would be c.

Notes

The chain of streams that leads to the active stream is decided by the _next attribute. To get to the active stream we just traverse this linked-list:

>>> def get_active_stream(self):
...     node = self
...     while node._next:
...         node = node._next
Return type:

StreamT

get_root_stream() StreamT[source]

Get the root stream that this stream was derived from.

Return type:

StreamT

add_processor(processor: Callable[[T], Union[T, Awaitable[T]]]) None[source]

Add processor callback executed whenever a new event is received.

Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.

For example a processor handling a stream of numbers may modify the value:

def double(value: int) -> int:
    return value * 2

stream.add_processor(double)
Return type:

None

info() Mapping[str, Any][source]

Return stream settings as a dictionary.

Return type:

_GenericAlias[str, Any]

clone(**kwargs: Any) StreamT[source]

Create a clone of this stream.

Notes

If the cloned stream is supposed to supersede this stream, like in group_by/through/etc., you should use _chain() instead so stream._next = cloned_stream is set and get_active_stream() returns the cloned stream.

Return type:

StreamT

noack() StreamT[source]

Create new stream where acks are manual.

Return type:

StreamT

async items() AsyncIterator[Tuple[Optional[Union[bytes, _ModelT, Any]], T_co]][source]

Iterate over the stream as key, value pairs.

Examples

@app.agent(topic)
async def mytask(stream):
    async for key, value in stream.items():
        print(key, value)
Return type:

_GenericAlias[_GenericAlias[_UnionGenericAlias[bytes, _ModelT, Any, None], +T_co]]

async events() AsyncIterable[EventT][source]

Iterate over the stream as events exclusively.

This means the stream must be iterating over a channel, or at least an iterable of event objects.

Return type:

_GenericAlias[EventT]

async take(max_: int, within: Union[timedelta, float, str]) AsyncIterable[Sequence[T_co]][source]

Buffer n values at a time and yield a list of buffered values.

Parameters:
  • max (int) – Max number of messages to receive. When more than this number of messages are received within the specified number of seconds then we flush the buffer immediately.

  • within (_UnionGenericAlias[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type:

_GenericAlias[_GenericAlias[+T_co]]

async take_events(max_: int, within: Union[timedelta, float, str]) AsyncIterable[Sequence[EventT]][source]

Buffer n events at a time and yield a list of buffered events. :type max_: int :param max_: Max number of messages to receive. When more than this

number of messages are received within the specified number of seconds then we flush the buffer immediately.

Parameters:

within (_UnionGenericAlias[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type:

_GenericAlias[_GenericAlias[EventT]]

async take_with_timestamp(max_: int, within: Union[timedelta, float, str], timestamp_field_name: str) AsyncIterable[Sequence[T_co]][source]
Buffer n values at a time and yield a list of buffered values with the

timestamp when the message was added to kafka.

Parameters:
  • max (int) – Max number of messages to receive. When more than this number of messages are received within the specified number of seconds then we flush the buffer immediately.

  • within (_UnionGenericAlias[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

  • timestamp_field_name (str) – the name of the field containing kafka timestamp, that is going to be added to the value

Return type:

_GenericAlias[_GenericAlias[+T_co]]

enumerate(start: int = 0) AsyncIterable[Tuple[int, T_co]][source]

Enumerate values received on this stream.

Unlike Python’s built-in enumerate, this works with async generators.

Return type:

_GenericAlias[_GenericAlias[int, +T_co]]

async noack_take(max_: int, within: Union[timedelta, float, str]) AsyncIterable[Sequence[T_co]][source]

Buffer n values at a time and yield a list of buffered values.

Parameters:
  • max (int) – Max number of messages to receive. When more than this number of messages are received within the specified number of seconds then we flush the buffer immediately.

  • within (_UnionGenericAlias[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type:

_GenericAlias[_GenericAlias[+T_co]]

through(channel: Union[str, ChannelT]) StreamT[source]

Forward values to in this stream to channel.

Send messages received on this stream to another channel, and return a new stream that consumes from that channel.

Notes

The messages are forwarded after any processors have been applied.

Example

topic = app.topic('foo')

@app.agent(topic)
async def mytask(stream):
    async for value in stream.through(app.topic('bar')):
        # value was first received in topic 'foo',
        # then forwarded and consumed from topic 'bar'
        print(value)
Return type:

StreamT

echo(*channels: Union[str, ChannelT]) StreamT[source]

Forward values to one or more channels.

Unlike through(), we don’t consume from these channels.

Return type:

StreamT

group_by(key: Union[FieldDescriptorT, Callable[[T], Optional[Union[bytes, _ModelT, Any]]]], *, name: Optional[str] = None, topic: Optional[TopicT] = None, partitions: Optional[int] = None) StreamT[source]

Create new stream that repartitions the stream using a new key.

Parameters:
  • key (_UnionGenericAlias[FieldDescriptorT, _CallableGenericAlias[~T, _UnionGenericAlias[bytes, _ModelT, Any, None]]]) –

    The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.

    Note: The name argument must be provided if the key

    argument is a callable.

  • name (_UnionGenericAlias[str, None]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.

Examples

Using a field descriptor to use a field in the event as the new key:

s = withdrawals_topic.stream()
# values in this stream are of type Withdrawal
async for event in s.group_by(Withdrawal.account_id):
    ...

Using an async callable to extract a new key:

s = withdrawals_topic.stream()

async def get_key(withdrawal):
    return await aiohttp.get(
        f'http://e.com/resolve_account/{withdrawal.account_id}')

async for event in s.group_by(get_key):
    ...

Using a regular callable to extract a new key:

s = withdrawals_topic.stream()

def get_key(withdrawal):
    return withdrawal.account_id.upper()

async for event in s.group_by(get_key):
    ...
Return type:

StreamT

filter(fun: Callable[[T], Union[T, Awaitable[T]]]) StreamT[source]

Filter values from stream using callback.

The callback may be a traditional function, lambda function, or an async def function.

This method is useful for filtering events before repartitioning a stream.

Examples

>>> async for v in stream.filter(lambda: v > 1000).group_by(...):
...     # do something
Return type:

StreamT

derive_topic(name: str, *, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, prefix: str = '', suffix: str = '') TopicT[source]

Create Topic description derived from the K/V type of this stream.

Parameters:
  • name (str) – Topic name.

  • key_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – Specific key type to use for this topic. If not set, the key type of this stream will be used.

  • value_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.

Raises:

ValueError – if the stream channel is not a topic.

Return type:

TopicT

async throw(exc: BaseException) None[source]

Send exception to stream iteration.

Return type:

None

combine(*nodes: JoinableT, **kwargs: Any) StreamT[source]

Combine streams and tables into joined stream.

Return type:

StreamT

contribute_to_stream(active: StreamT) None[source]

Add stream as node in joined stream.

Return type:

None

async remove_from_stream(stream: StreamT) None[source]

Remove as node in a joined stream.

Return type:

None

join(*fields: FieldDescriptorT) StreamT[source]

Create stream where events are joined.

Return type:

StreamT

left_join(*fields: FieldDescriptorT) StreamT[source]

Create stream where events are joined by LEFT JOIN.

Return type:

StreamT

inner_join(*fields: FieldDescriptorT) StreamT[source]

Create stream where events are joined by INNER JOIN.

Return type:

StreamT

outer_join(*fields: FieldDescriptorT) StreamT[source]

Create stream where events are joined by OUTER JOIN.

Return type:

StreamT

async on_merge(value: Optional[T] = None) Optional[T][source]

Signal called when an event is to be joined.

Return type:

_UnionGenericAlias[~T, None]

async on_start() None[source]

Signal called when the stream starts.

Return type:

None

async stop() None[source]

Stop this stream.

Return type:

None

async on_stop() None[source]

Signal that the stream is stopping.

Return type:

None

async ack(event: EventT) bool[source]

Ack event.

This will decrease the reference count of the event message by one, and when the reference count reaches zero, the worker will commit the offset so that the message will not be seen by a worker again.

Parameters:

event (EventT) – Event to ack.

Return type:

bool

property label: str

Return description of stream, used in graphs and logs. :rtype: str

property shortlabel: str

Return short description of stream. :rtype: str

class faust.StreamT(channel: Optional[AsyncIterator[T_co]] = None, *, app: Optional[_AppT] = None, processors: Optional[Iterable[Callable[[T], Union[T, Awaitable[T]]]]] = None, combined: Optional[List[JoinableT]] = None, on_start: Optional[Callable] = None, join_strategy: Optional[_JoinT] = None, beacon: Optional[NodeT] = None, concurrency_index: Optional[int] = None, prev: Optional[StreamT] = None, active_partitions: Optional[Set[TP]] = None, enable_acks: bool = True, prefix: str = '', loop: Optional[AbstractEventLoop] = None)[source]
app: _AppT
channel: AsyncIterator[T_co]
outbox: Optional[Queue] = None
join_strategy: Optional[_JoinT] = None
task_owner: Optional[Task] = None
current_event: Optional[EventT] = None
active_partitions: Optional[Set[TP]] = None
concurrency_index: Optional[int] = None
enable_acks: bool = True
prefix: str = ''
combined: List[JoinableT]
abstract get_active_stream() StreamT[source]
Return type:

StreamT

abstract add_processor(processor: Callable[[T], Union[T, Awaitable[T]]]) None[source]
Return type:

None

abstract info() Mapping[str, Any][source]
Return type:

_GenericAlias[str, Any]

abstract clone(**kwargs: Any) StreamT[source]
Return type:

StreamT

abstract async items() AsyncIterator[Tuple[Optional[Union[bytes, _ModelT, Any]], T_co]][source]
abstract async events() AsyncIterable[EventT][source]
abstract async take(max_: int, within: Union[timedelta, float, str]) AsyncIterable[Sequence[T_co]][source]
abstract enumerate(start: int = 0) AsyncIterable[Tuple[int, T_co]][source]
Return type:

_GenericAlias[_GenericAlias[int, +T_co]]

abstract through(channel: Union[str, ChannelT]) StreamT[source]
Return type:

StreamT

abstract echo(*channels: Union[str, ChannelT]) StreamT[source]
Return type:

StreamT

abstract group_by(key: Union[FieldDescriptorT, Callable[[T], Optional[Union[bytes, _ModelT, Any]]]], *, name: Optional[str] = None, topic: Optional[TopicT] = None) StreamT[source]
Return type:

StreamT

abstract derive_topic(name: str, *, schema: Optional[_SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, prefix: str = '', suffix: str = '') TopicT[source]
Return type:

TopicT

abstract async throw(exc: BaseException) None[source]
Return type:

None

abstract async ack(event: EventT) bool[source]
Return type:

bool

faust.current_event() Optional[EventT][source]

Return the event currently being processed, or None.

Return type:

_UnionGenericAlias[EventT, None]

class faust.GlobalTable(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, **kwargs: Any)[source]

Warning

Using a GlobalTable with multiple app instances may cause an app to be stuck in an infinite recovery loop. The current fix for this is to run the table with the following options:

app.GlobalTable(..., partitions=1, recovery_buffer_size=1)
logger: logging.Logger = <Logger faust.tables.globaltable (WARNING)>
class faust.SetTable(app: AppT, *, start_manager: bool = False, manager_topic_name: Optional[str] = None, manager_topic_suffix: Optional[str] = None, **kwargs: Any)[source]

Table that maintains a dictionary of sets.

Manager

alias of SetTableManager

WindowWrapper

alias of SetWindowWrapper

logger: logging.Logger = <Logger faust.tables.sets (WARNING)>
start_manager: bool
manager_topic_suffix: str = '-setmanager'
manager_topic_name: str
manager: SetTableManager
async on_start() None[source]

Call when set table starts.

Return type:

None

class faust.SetGlobalTable(app: AppT, *, start_manager: bool = False, manager_topic_name: Optional[str] = None, manager_topic_suffix: Optional[str] = None, **kwargs: Any)[source]
logger: logging.Logger = <Logger faust.tables.sets (WARNING)>
class faust.Table(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, **kwargs: Any)[source]

Table (non-windowed).

class WindowWrapper(table: TableT, *, relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]] = None, key_index: bool = False, key_index_table: Optional[TableT] = None)

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

as_ansitable(title: str = '{table.name}', **kwargs: Any) str

Draw table as a terminal ANSI table.

Return type:

str

clone(relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT

Clone this table using a new time-relativity configuration.

Return type:

WindowWrapperT

property get_relative_timestamp: Optional[Callable[[Optional[EventT]], Union[float, datetime]]]

Return the current handler for extracting event timestamp. :rtype: _UnionGenericAlias[_CallableGenericAlias[_UnionGenericAlias[EventT, None], _UnionGenericAlias[float, datetime]], None]

get_timestamp(event: Optional[EventT] = None) float

Get timestamp from event.

Return type:

float

items(event: Optional[EventT] = None) ItemsView

Return table items view: iterate over (key, value) pairs.

Return type:

_SpecialGenericAlias

key_index: bool = False
key_index_table: Optional[TableT] = None
keys() KeysView

Return table keys view: iterate over keys found in this table.

Return type:

_SpecialGenericAlias

property name: str

Return the name of this table. :rtype: str

on_del_key(key: Any) None

Call when a key is deleted from this table.

Return type:

None

on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]]

Call after table recovery.

Return type:

_CallableGenericAlias[_GenericAlias[None]]

on_set_key(key: Any, value: Any) None

Call when the value for a key in this table is set.

Return type:

None

relative_to(ts: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT

Configure the time-relativity of this windowed table.

Return type:

WindowWrapperT

relative_to_field(field: FieldDescriptorT) WindowWrapperT

Configure table to be time-relative to a field in the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Further it will not use the timestamp of the Kafka message, but a field in the value of the event.

For example a model field:

class Account(faust.Record):
    created: float

table = app.Table('foo').hopping(
    ...,
).relative_to_field(Account.created)
Return type:

WindowWrapperT

relative_to_now() WindowWrapperT

Configure table to be time-relative to the system clock.

Return type:

WindowWrapperT

relative_to_stream() WindowWrapperT

Configure table to be time-relative to the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Return type:

WindowWrapperT

values(event: Optional[EventT] = None) ValuesView

Return table values view: iterate over values in this table.

Return type:

_SpecialGenericAlias

using_window(window: WindowT, *, key_index: bool = False) WindowWrapperT[source]

Wrap table using a specific window type.

Return type:

WindowWrapperT

hopping(size: Union[timedelta, float, str], step: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT[source]

Wrap table in a hopping window.

Return type:

WindowWrapperT

tumbling(size: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT[source]

Wrap table in a tumbling window.

Return type:

WindowWrapperT

on_key_get(key: KT) None[source]

Call when the value for a key in this table is retrieved.

Return type:

None

on_key_set(key: KT, value: VT) None[source]

Call when the value for a key in this table is set.

Return type:

None

on_key_del(key: KT) None[source]

Call when a key in this table is removed.

Return type:

None

on_clear() None[source]

Call when the table is cleared.

Return type:

None

as_ansitable(title: str = '{table.name}', **kwargs: Any) str[source]

Draw table as a terminal ANSI table.

Return type:

str

logger: logging.Logger = <Logger faust.tables.table (WARNING)>
class faust.Topic(app: AppT, *, topics: Optional[Sequence[str]] = None, pattern: Optional[Union[str, Pattern]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, is_iterator: bool = False, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, replicas: Optional[int] = None, acks: bool = True, internal: bool = False, config: Optional[Mapping[str, Any]] = None, queue: Optional[ThrowableQueue] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, maxsize: Optional[int] = None, root: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, allow_empty: Optional[bool] = None, has_prefix: bool = False, loop: Optional[AbstractEventLoop] = None)[source]

Define new topic description.

Parameters:
  • app (AppT) – App instance used to create this topic description.

  • topics (_UnionGenericAlias[_GenericAlias[str], None]) – List of topic names.

  • partitions (_UnionGenericAlias[int, None]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, and autoCreateTopics is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.

  • retention (_UnionGenericAlias[timedelta, float, str, None]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server.

  • pattern (_UnionGenericAlias[str, _SpecialGenericAlias, None]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.

  • schema (_UnionGenericAlias[SchemaT, None]) – Schema used for serialization/deserialization.

  • key_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – How to deserialize keys for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect” (Overrides schema if one is defined).

  • value_type (_UnionGenericAlias[_GenericAlias[ModelT], _GenericAlias[bytes], _GenericAlias[str], None]) – How to deserialize values for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect” (Overrides schema if ones is defined).

  • active_partitions (_UnionGenericAlias[_GenericAlias[TP], None]) – Set of faust.types.tuples.TP that this topic should be restricted to.

Raises:

TypeError – if both topics and pattern is provided.

async send(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False) Awaitable[RecordMetadata][source]

Send message to topic.

Return type:

_GenericAlias[RecordMetadata]

send_soon(*, key: Optional[Union[bytes, _ModelT, Any]] = None, value: Optional[Union[bytes, _ModelT, Any]] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]] = None, schema: Optional[SchemaT] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, callback: Optional[Callable[[FutureMessage], Union[None, Awaitable[None]]]] = None, force: bool = False, eager_partitioning: bool = False) FutureMessage[source]

Produce message by adding to buffer.

Notes

This method can be used by non-async def functions to produce messages.

Return type:

FutureMessage

async put(event: EventT) None[source]

Put event directly onto the underlying queue of this topic.

This will only affect subscribers to a particular instance, in a particular process.

Return type:

None

property pattern: Optional[Pattern]

Regular expression used by this topic (if any). :rtype: _UnionGenericAlias[_SpecialGenericAlias, None]

property partitions: Optional[int]

Return the number of configured partitions for this topic.

Notes

This is only active for internal topics, fully owned and managed by Faust itself.

We never touch the configuration of a topic that exists in Kafka, and Kafka will sometimes automatically create topics when they don’t exist. In this case the number of partitions for the automatically created topic will depend on the Kafka server configuration (num.partitions).

Always make sure your topics have the correct number of partitions. :rtype: _UnionGenericAlias[int, None]

derive(**kwargs: Any) ChannelT[source]

Create topic derived from the configuration of this topic.

Configuration will be copied from this topic, but any parameter overridden as a keyword argument.

See also

derive_topic(): for a list of supported keyword arguments.

Return type:

ChannelT

derive_topic(*, topics: Optional[Sequence[str]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, internal: Optional[bool] = None, config: Optional[Mapping[str, Any]] = None, prefix: str = '', suffix: str = '', **kwargs: Any) TopicT[source]

Create new topic with configuration derived from this topic.

Return type:

TopicT

get_topic_name() str[source]

Return the main topic name of this topic description.

As topic descriptions can have multiple topic names, this will only return when the topic has a singular topic name in the description.

Raises:
  • TypeError – if configured with a regular expression pattern.

  • ValueError – if configured with multiple topic names.

  • TypeError – if not configured with any names or patterns.

Return type:

str

async publish_message(fut: FutureMessage, wait: bool = False) Awaitable[RecordMetadata][source]

Fulfill promise to publish message to topic.

Return type:

_GenericAlias[RecordMetadata]

maybe_declare() None[source]

Declare/create this topic, only if it does not exist.

Return type:

None

async declare() None[source]

Declare/create this topic on the server.

Return type:

None

on_stop_iteration() None[source]

Signal that iteration over this channel was stopped. .. tip:: Remember to call super when overriding this method.

Return type:

None

class faust.TopicT(app: _AppT, *, topics: Optional[Sequence[str]] = None, pattern: Optional[Union[str, Pattern]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, is_iterator: bool = False, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, replicas: Optional[int] = None, acks: bool = True, internal: bool = False, config: Optional[Mapping[str, Any]] = None, queue: Optional[ThrowableQueue] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, maxsize: Optional[int] = None, root: Optional[ChannelT] = None, active_partitions: Optional[Set[TP]] = None, allow_empty: bool = False, has_prefix: bool = False, loop: Optional[AbstractEventLoop] = None)[source]
topics: Sequence[str]

Iterable/Sequence of topic names to subscribe to.

retention: Optional[Union[timedelta, float, str]]

expiry time in seconds for messages in the topic.

Type:

Topic retention setting

compacting: Optional[bool]

Flag that when enabled means the topic can be “compacted”: if the topic is a log of key/value pairs, the broker can delete old values for the same key.

deleting: Optional[bool]
replicas: Optional[int]

Number of replicas for topic.

config: Optional[Mapping[str, Any]]

Additional configuration as a mapping.

acks: bool

Enable acks for this topic.

internal: bool

it’s owned by us and we are allowed to create or delete the topic as necessary.

Type:

Mark topic as internal

has_prefix: bool = False
active_partitions: Optional[Set[TP]]
abstract property pattern: Optional[Pattern]
Return type:

_UnionGenericAlias[_SpecialGenericAlias, None]

abstract property partitions: Optional[int]
Return type:

_UnionGenericAlias[int, None]

abstract derive(**kwargs: Any) ChannelT[source]
Return type:

ChannelT

abstract derive_topic(*, topics: Optional[Sequence[str]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, partitions: Optional[int] = None, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, internal: bool = False, config: Optional[Mapping[str, Any]] = None, prefix: str = '', suffix: str = '', **kwargs: Any) TopicT[source]
Return type:

TopicT

class faust.GSSAPICredentials(*, kerberos_service_name: str = 'kafka', kerberos_domain_name: Optional[str] = None, ssl_context: Optional[SSLContext] = None, mechanism: Optional[Union[str, SASLMechanism]] = None)[source]

Describe GSSAPI credentials over SASL.

ssl_context: Optional[SSLContext]
protocol: AuthProtocol = 'SASL_PLAINTEXT'
mechanism: SASLMechanism = 'GSSAPI'
class faust.SASLCredentials(*, username: Optional[str] = None, password: Optional[str] = None, ssl_context: Optional[SSLContext] = None, mechanism: Optional[Union[str, SASLMechanism]] = None)[source]

Describe SASL credentials.

username: Optional[str]
password: Optional[str]
ssl_context: Optional[SSLContext]
protocol: AuthProtocol = 'SASL_PLAINTEXT'
mechanism: SASLMechanism = 'PLAIN'
class faust.SSLCredentials(context: Optional[SSLContext] = None, *, purpose: Optional[Any] = None, cafile: Optional[str] = None, capath: Optional[str] = None, cadata: Optional[str] = None)[source]

Describe SSL credentials/settings.

protocol: AuthProtocol = 'SSL'
context: SSLContext
class faust.OAuthCredentials(*, oauth_cb: AbstractTokenProvider, ssl_context: Optional[SSLContext] = None)[source]

Describe OAuth Bearer credentials over SASL

mechanism: SASLMechanism = 'OAUTHBEARER'
ssl_context: Optional[SSLContext]
protocol: AuthProtocol = 'SASL_PLAINTEXT'
class faust.Settings(*args: Any, **kwargs: Any)[source]
NODE_HOSTNAME: ClassVar[str] = 'fv-az2026-396'
DEFAULT_BROKER_URL: ClassVar[str] = 'kafka://localhost:9092'
env: Mapping[str, str]

Environment. Defaults to os.environ.

on_init(id: str, **kwargs: Any) None[source]
Return type:

None

getenv(env_name: str) Any[source]
Return type:

Any

relative_to_appdir(path: Path) Path[source]

Prepare app directory path.

If path is absolute the path is returned as-is, but if path is relative it will be assumed to belong under the app directory.

Return type:

Path

data_directory_for_version(version: int) Path[source]

Return the directory path for data belonging to specific version.

Return type:

Path

find_old_versiondirs() Iterable[Path][source]
Return type:

_GenericAlias[Path]

property name: str
Return type:

str

property id: str
Return type:

str

property appdir: Path
Return type:

Path

property MY_SETTING

My custom setting.

To contribute new settings you only have to define a new setting decorated attribute here.

Look at the other settings for examples.

Remember that once you’ve added the setting you must also render the configuration reference:

$ make configref
property autodiscover

Automatic discovery of agents, tasks, timers, views and commands.

Faust has an API to add different asyncio services and other user extensions, such as “Agents”, HTTP web views, command-line commands, and timers to your Faust workers. These can be defined in any module, so to discover them at startup, the worker needs to traverse packages looking for them.

Warning

The autodiscovery functionality uses the https://pypi.org/project/Venusian/ library to scan wanted packages for @app.agent, @app.page, @app.command, @app.task and @app.timer decorators, but to do so, it’s required to traverse the package path and import every module in it.

Importing random modules like this can be dangerous so make sure you follow Python programming best practices. Do not start threads; perform network I/O; do test monkey-patching for mocks or similar, as a side effect of importing a module. If you encounter a case such as this then please find a way to perform your action in a lazy manner.

Warning

If the above warning is something you cannot fix, or if it’s out of your control, then please set autodiscover=False and make sure the worker imports all modules where your decorators are defined.

The value for this argument can be:

bool

If App(autodiscover=True) is set, the autodiscovery will scan the package name described in the origin attribute.

The origin attribute is automatically set when you start a worker using the faust command line program, for example:

faust -A example.simple worker

The -A, option specifies the app, but you can also create a shortcut entry point by calling app.main():

if __name__ == '__main__':
    app.main()

Then you can start the faust program by executing for example python myscript.py worker --loglevel=INFO, and it will use the correct application.

Sequence[str]

The argument can also be a list of packages to scan:

app = App(..., autodiscover=['proj_orders', 'proj_accounts'])
Callable[[], Sequence[str]]

The argument can also be a function returning a list of packages to scan:

def get_all_packages_to_scan():
    return ['proj_orders', 'proj_accounts']

app = App(..., autodiscover=get_all_packages_to_scan)
False

If everything you need is in a self-contained module, or you import the stuff you need manually, just set autodiscover to False and don’t worry about it :-)

Django

When using https://pypi.org/project/Django/ and the DJANGO_SETTINGS_MODULE environment variable is set, the Faust app will scan all packages found in the INSTALLED_APPS setting.

If you’re using Django you can use this to scan for agents/pages/commands in all packages defined in INSTALLED_APPS.

Faust will automatically detect that you’re using Django and do the right thing if you do:

app = App(..., autodiscover=True)

It will find agents and other decorators in all of the reusable Django applications. If you want to manually control what packages are traversed, then provide a list:

app = App(..., autodiscover=['package1', 'package2'])

or if you want exactly None packages to be traversed, then provide a False:

app = App(.., autodiscover=False)

which is the default, so you can simply omit the argument.

Tip

For manual control over autodiscovery, you can also call the app.discover() method manually.

property datadir

Application data directory.

The directory in which this instance stores the data used by local tables, etc.

See also

  • The data directory can also be set using the faust --datadir option, from the command-line, so there is usually no reason to provide a default value when creating the app.

property tabledir

Application table data directory.

The directory in which this instance stores local table data. Usually you will want to configure the datadir setting, but if you want to store tables separately you can configure this one.

If the path provided is relative (it has no leading slash), then the path will be considered to be relative to the datadir setting.

property debug

Use in development to expose sensor information endpoint.

Tip

If you want to enable the sensor statistics endpoint in production, without enabling the debug setting, you can do so by adding the following code:

app.web.blueprints.add(
    '/stats/', 'faust.web.apps.stats:blueprint')
property env_prefix

Environment variable prefix.

When configuring Faust by environent variables, this adds a common prefix to all Faust environment value names.

property id_format

Application ID format template.

The format string used to generate the final id value by combining it with the version parameter.

property origin

The reverse path used to find the app.

For example if the app is located in:

from myproj.app import app

Then the origin should be "myproj.app".

The faust worker program will try to automatically set the origin, but if you are having problems with auto generated names then you can set origin manually.

property timezone

Project timezone.

The timezone used for date-related functionality such as cronjobs.

property version

App version.

Version of the app, that when changed will create a new isolated instance of the application. The first version is 1, the second version is 2, and so on.

Source topics will not be affected by a version change.

Faust applications will use two kinds of topics: source topics, and internally managed topics. The source topics are declared by the producer, and we do not have the opportunity to modify any configuration settings, like number of partitions for a source topic; we may only consume from them. To mark a topic as internal, use: app.topic(..., internal=True).

property agent_supervisor

Default agent supervisor type.

An agent may start multiple instances (actors) when the concurrency setting is higher than one (e.g. @app.agent(concurrency=2)).

Multiple instances of the same agent are considered to be in the same supervisor group.

The default supervisor is the mode.OneForOneSupervisor: if an instance in the group crashes, we restart that instance only.

These are the supervisors supported:

property blocking_timeout

Blocking timeout (in seconds).

When specified the worker will start a periodic signal based timer that only triggers when the loop has been blocked for a time exceeding this timeout.

This is the most safe way to detect blocking, but could have adverse effects on libraries that do not automatically retry interrupted system calls.

Python itself does retry all interrupted system calls since version 3.5 (see PEP 475), but this might not be the case with C extensions added to the worker by the user.

The blocking detector is a background thread that periodically wakes up to either arm a timer, or cancel an already armed timer. In pseudocode:

while True:
    # cancel previous alarm and arm new alarm
    signal.signal(signal.SIGALRM, on_alarm)
    signal.setitimer(signal.ITIMER_REAL, blocking_timeout)
    # sleep to wakeup just before the timeout
    await asyncio.sleep(blocking_timeout * 0.96)

def on_alarm(signum, frame):
    logger.warning('Blocking detected: ...')

If the sleep does not wake up in time the alarm signal will be sent to the process and a traceback will be logged.

property broker

Broker URL, or a list of alternative broker URLs.

Faust needs the URL of a “transport” to send and receive messages.

Currently, the only supported production transport is kafka://. This uses the https://pypi.org/project/aiokafka/ client under the hood, for consuming and producing messages.

You can specify multiple hosts at the same time by separating them using the semi-comma:

kafka://kafka1.example.com:9092;kafka2.example.com:9092

Which in actual code looks like this:

BROKERS = 'kafka://kafka1.example.com:9092;kafka2.example.com:9092'
app = faust.App(
    'id',
    broker=BROKERS,
)

You can also pass a list of URLs:

app = faust.App(
    'id',
    broker=['kafka://kafka1.example.com:9092',
            'kafka://kafka2.example.com:9092'],
)

See also

You can configure the transport used for consuming and producing separately, by setting the broker_consumer and broker_producer settings.

This setting is used as the default.

Available Transports

property broker_consumer

Consumer broker URL.

You can use this setting to configure the transport used for producing and consuming separately.

If not set the value found in broker will be used.

property broker_producer

Producer broker URL.

You can use this setting to configure the transport used for producing and consuming separately.

If not set the value found in broker will be used.

property broker_api_version

Broker API version,.

This setting is also the default for consumer_api_version, and producer_api_version.

Negotiate producer protocol version.

The default value - “auto” means use the latest version supported by both client and server.

Any other version set means you are requesting a specific version of the protocol.

Example Kafka uses:

Disable sending headers for all messages produced

Kafka headers support was added in Kafka 0.11, so you can specify broker_api_version="0.10" to remove the headers from messages.

property broker_check_crcs

Broker CRC check.

Automatically check the CRC32 of the records consumed.

property broker_client_id

Broker client ID.

There is rarely any reason to configure this setting.

The client id is used to identify the software used, and is not usually configured by the user.

property broker_commit_every

Broker commit message frequency.

Commit offset every n messages.

See also broker_commit_interval, which is how frequently we commit on a timer when there are few messages being received.

property broker_commit_interval

Broker commit time frequency.

How often we commit messages that have been fully processed (acked).

property broker_commit_livelock_soft_timeout

Commit livelock timeout.

How long time it takes before we warn that the Kafka commit offset has not advanced (only when processing messages).

property broker_credentials

Broker authentication mechanism.

Specify the authentication mechanism to use when connecting to the broker.

The default is to not use any authentication.

SASL Authentication

You can enable SASL authentication via plain text:

app = faust.App(
    broker_credentials=faust.SASLCredentials(
        username='x',
        password='y',
    ))

Warning

Do not use literal strings when specifying passwords in production, as they can remain visible in stack traces.

Instead the best practice is to get the password from a configuration file, or from the environment:

BROKER_USERNAME = os.environ.get('BROKER_USERNAME')
BROKER_PASSWORD = os.environ.get('BROKER_PASSWORD')

app = faust.App(
    broker_credentials=faust.SASLCredentials(
        username=BROKER_USERNAME,
        password=BROKER_PASSWORD,
    ))
GSSAPI Authentication

GSSAPI authentication over plain text:

app = faust.App(
    broker_credentials=faust.GSSAPICredentials(
        kerberos_service_name='faust',
        kerberos_domain_name='example.com',
    ),
)

GSSAPI authentication over SSL:

import ssl
ssl_context = ssl.create_default_context(
    purpose=ssl.Purpose.SERVER_AUTH, cafile='ca.pem')
ssl_context.load_cert_chain(
    'client.cert', keyfile='client.key')

app = faust.App(
    broker_credentials=faust.GSSAPICredentials(
        kerberos_service_name='faust',
        kerberos_domain_name='example.com',
        ssl_context=ssl_context,
    ),
)
SSL Authentication

Provide an SSL context for the Kafka broker connections.

This allows Faust to use a secure SSL/TLS connection for the Kafka connections and enabling certificate-based authentication.

import ssl

ssl_context = ssl.create_default_context(
    purpose=ssl.Purpose.SERVER_AUTH, cafile='ca.pem')
ssl_context.load_cert_chain(
    'client.cert', keyfile='client.key')
app = faust.App(..., broker_credentials=ssl_context)
property broker_heartbeat_interval

Broker heartbeat interval.

How often we send heartbeats to the broker, and also how often we expect to receive heartbeats from the broker.

If any of these time out, you should increase this setting.

property broker_max_poll_interval

Broker max poll interval.

The maximum allowed time (in seconds) between calls to consume messages If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout.

See KIP-62 for technical details.

property broker_max_poll_records

Broker max poll records.

The maximum number of records returned in a single call to poll(). If you find that your application needs more time to process messages you may want to adjust broker_max_poll_records to tune the number of records that must be handled on every loop iteration.

property broker_rebalance_timeout

Broker rebalance timeout.

How long to wait for a node to finish rebalancing before the broker will consider it dysfunctional and remove it from the cluster.

Increase this if you experience the cluster being in a state of constantly rebalancing, but make sure you also increase the broker_heartbeat_interval at the same time.

Note

The session timeout must not be greater than the broker_request_timeout.

property broker_request_timeout

Kafka client request timeout.

Note

The request timeout must not be less than the broker_session_timeout.

property broker_session_timeout

Broker session timeout.

How long to wait for a node to finish rebalancing before the broker will consider it dysfunctional and remove it from the cluster.

Increase this if you experience the cluster being in a state of constantly rebalancing, but make sure you also increase the broker_heartbeat_interval at the same time.

Note

The session timeout must not be greater than the broker_request_timeout.

property ssl_context

SSL configuration.

See credentials.

property consumer_api_version

Consumer API version.

Configures the broker API version to use for consumers. See broker_api_version for more information.

property consumer_max_fetch_size

Consumer max fetch size.

The maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size.

Note: This is PER PARTITION, so a limit of 1Mb when your workers consume from 10 topics having 100 partitions each, means a fetch request can be up to a gigabyte (10 * 100 * 1Mb), This limit being too generous may cause rebalancing issues: if the amount of time required to flush pending data stuck in socket buffers exceed the rebalancing timeout.

You must keep this limit low enough to account for many partitions being assigned to a single node.

property consumer_auto_offset_reset

Consumer auto offset reset.

Where the consumer should start reading messages from when there is no initial offset, or the stored offset no longer exists, e.g. when starting a new consumer for the first time.

Options include ‘earliest’, ‘latest’, ‘none’.

property consumer_group_instance_id

Consumer group instance id.

The group_instance_id for static partition assignment.

If not set, default assignment strategy is used. Otherwise, each consumer instance has to have a unique id.

property consumer_metadata_max_age_ms

Consumer metadata max age milliseconds

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

Default: 300000

property consumer_connections_max_idle_ms

Consumer connections max idle milliseconds.

Close idle connections after the number of milliseconds specified by this config.

Default: 540000 (9 minutes).

property key_serializer

Default key serializer.

Serializer used for keys by default when no serializer is specified, or a model is not being used.

This can be the name of a serializer/codec, or an actual faust.serializers.codecs.Codec instance.

See also

  • The Codecs section in the model guide – for more information about codecs.

property value_serializer

Default value serializer.

Serializer used for values by default when no serializer is specified, or a model is not being used.

This can be string, the name of a serializer/codec, or an actual faust.serializers.codecs.Codec instance.

See also

  • The Codecs section in the model guide – for more information about codecs.

property logging_config

Logging dictionary configuration.

Optional dictionary for logging configuration, as supported by logging.config.dictConfig().

property loghandlers

List of custom logging handlers.

Specify a list of custom log handlers to use in worker instances.

property producer_acks

Producer Acks.

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:

  • 0: Producer will not wait for any acknowledgment from

    the server at all. The message will immediately be considered sent (Not recommended).

  • 1: The broker leader will write the record to its local

    log but will respond without awaiting full acknowledgment from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

  • -1: The broker leader will wait for the full set of in-sync

    replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

property producer_api_version

Producer API version.

Configures the broker API version to use for producers. See broker_api_version for more information.

property producer_compression_type

Producer compression type.

The compression type for all data generated by the producer. Valid values are gzip, snappy, lz4, or None.

property producer_linger

Producer batch linger configuration.

Minimum time to batch before sending out messages from the producer.

Should rarely have to change this.

property producer_max_batch_size

Producer max batch size.

Max size of each producer batch, in bytes.

property producer_max_request_size

Producer maximum request size.

Maximum size of a request in bytes in the producer.

Should rarely have to change this.

property producer_partitioner

Producer partitioning strategy.

The Kafka producer can be configured with a custom partitioner to change how keys are partitioned when producing to topics.

The default partitioner for Kafka is implemented as follows, and can be used as a template for your own partitioner:

import random
from typing import List
from kafka.partitioner.hashed import murmur2

def partition(key: bytes,
            all_partitions: List[int],
            available: List[int]) -> int:
    '''Default partitioner.

    Hashes key to partition using murmur2 hashing
    (from java client) If key is None, selects partition
    randomly from available, or from all partitions if none
    are currently available

    Arguments:
        key: partitioning key
        all_partitions: list of all partitions sorted by
                        partition ID.
        available: list of available partitions
                   in no particular order
    Returns:
        int: one of the values from ``all_partitions``
             or ``available``.
    '''
    if key is None:
        source = available if available else all_paritions
        return random.choice(source)
    index: int = murmur2(key)
    index &= 0x7fffffff
    index %= len(all_partitions)
    return all_partitions[index]
property producer_request_timeout

Producer request timeout.

Timeout for producer operations. This is set high by default, as this is also the time when producer batches expire and will no longer be retried.

property producer_threaded

Thread separate producer for send_soon.

If True, spin up a different producer in a different thread to be used for messages buffered up for producing via send_soon function.

property producer_metadata_max_age_ms

Producer metadata max age milliseconds

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

Default: 300000

property producer_connections_max_idle_ms

Producer connections max idle milliseconds.

Close idle connections after the number of milliseconds specified by this config.

Default: 540000 (9 minutes).

property recovery_consistency_check

Check Kafka and local offsets for consistency.

If True, assert that Kafka highwater offsets >= local offset in the rocksdb state storee

property store_check_exists

Execute exists on the underlying store.

If True, executes exists on the underlying store. If False client has to catch KeyError

property crash_app_on_aerospike_exception

Crashes the app on an aerospike Exceptions.

If True, crashes the app and prevents the commit offset on progressing. If False client has to catch the Error and implement a dead letter queue

property aerospike_retries_on_exception

Number of retries to aerospike on a runtime error from the aerospike client.

Set this to the number of retries using the aerospike client on a runtime Exception thrown by the client

property aerospike_sleep_seconds_between_retries_on_exception

Seconds to sleep between retries to aerospike on a runtime error from the aerospike client.

Set this to the sleep in seconds between retries using the aerospike client on a runtime Exception thrown by the client

property reply_create_topic

Automatically create reply topics.

Set this to True if you plan on using the RPC with agents.

This will create the internal topic used for RPC replies on that instance at startup.

property reply_expires

RPC reply expiry time in seconds.

The expiry time (in seconds float, or timedelta), for how long replies will stay in the instances local reply topic before being removed.

property reply_to

Reply to address.

The name of the reply topic used by this instance. If not set one will be automatically generated when the app is created.

property reply_to_prefix

Reply address topic name prefix.

The prefix used when generating reply topic names.

property processing_guarantee

The processing guarantee that should be used.

Possible values are “at_least_once” (default) and “exactly_once”.

Note that if exactly-once processing is enabled consumers are configured with isolation.level="read_committed" and producers are configured with retries=Integer.MAX_VALUE and enable.idempotence=true per default.

Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production. For development you can change this, by adjusting broker setting transaction.state.log.replication.factor to the number of brokers you want to use.

property stream_buffer_maxsize

Stream buffer maximum size.

This setting control back pressure to streams and agents reading from streams.

If set to 4096 (default) this means that an agent can only keep at most 4096 unprocessed items in the stream buffer.

Essentially this will limit the number of messages a stream can “prefetch”.

Higher numbers gives better throughput, but do note that if your agent sends messages or update tables (which sends changelog messages).

This means that if the buffer size is large, the broker_commit_interval or broker_commit_every settings must be set to commit frequently, avoiding back pressure from building up.

A buffer size of 131_072 may let you process over 30,000 events a second as a baseline, but be careful with a buffer size that large when you also send messages or update tables.

property stream_processing_timeout

Stream processing timeout.

Timeout (in seconds) for processing events in the stream. If processing of a single event exceeds this time we log an error, but do not stop processing.

If you are seeing a warning like this you should either

  1. increase this timeout to allow agents to spend more time

    on a single event, or

  2. add a timeout to the operation in the agent, so stream processing

    always completes before the timeout.

The latter is preferred for network operations such as web requests. If a network service you depend on is temporarily offline you should consider doing retries (send to separate topic):

main_topic = app.topic('main')
deadletter_topic = app.topic('main_deadletter')

async def send_request(value, timeout: Optional[float] = None) -> None:
    await app.http_client.get('http://foo.com', timeout=timeout)

@app.agent(main_topic)
async def main(stream):
    async for value in stream:
    try:
        await send_request(value, timeout=5)
    except asyncio.TimeoutError:
        await deadletter_topic.send(value)

@app.agent(deadletter_topic)
    async def main_deadletter(stream):
        async for value in stream:
        # wait for 30 seconds before retrying.
        await stream.sleep(30)
        await send_request(value)
property stream_publish_on_commit

Stream delay producing until commit time.

If enabled we buffer up sending messages until the source topic offset related to that processing is committed. This means when we do commit, we may have buffered up a LOT of messages so commit needs to happen frequently (make sure to decrease broker_commit_every).

property stream_recovery_delay

Stream recovery delayl

Number of seconds to sleep before continuing after rebalance. We wait for a bit to allow for more nodes to join/leave before starting recovery tables and then processing streams. This to minimize the chance of errors rebalancing loops.

property stream_wait_empty

Stream wait empty.

This setting controls whether the worker should wait for the currently processing task in an agent to complete before rebalancing or shutting down.

On rebalance/shut down we clear the stream buffers. Those events will be reprocessed after the rebalance anyway, but we may have already started processing one event in every agent, and if we rebalance we will process that event again.

By default we will wait for the currently active tasks, but if your streams are idempotent you can disable it using this setting.

property store

Table storage backend URL.

The backend used for table storage.

Tables are stored in-memory by default, but you should not use the memory:// store in production.

In production, a persistent table store, such as rocksdb:// is preferred.

property table_cleanup_interval

Table cleanup interval.

How often we cleanup tables to remove expired entries.

property table_key_index_size

Table key index size.

Tables keep a cache of key to partition number to speed up table lookups.

This setting configures the maximum size of that cache.

property table_standby_replicas

Table standby replicas.

The number of standby replicas for each table.

property topic_allow_declare

Allow creating new topics.

This setting disables the creation of internal topics.

Faust will only create topics that it considers to be fully owned and managed, such as intermediate repartition topics, table changelog topics etc.

Some Kafka managers does not allow services to create topics, in that case you should set this to False.

property topic_disable_leader

Disable leader election topic.

This setting disables the creation of the leader election topic.

If you’re not using the on_leader=True argument to task/timer/etc., decorators then use this setting to disable creation of the topic.

property topic_partitions

Topic partitions.

Default number of partitions for new topics.

Note

This defines the maximum number of workers we could distribute the workload of the application (also sometimes referred as the sharding factor of the application).

property topic_replication_factor

Topic replication factor.

The default replication factor for topics created by the application.

Note

Generally this should be the same as the configured replication factor for your Kafka cluster.

property cache

Cache backend URL.

Optional backend used for Memcached-style caching. URL can be:

  • redis://host

  • rediscluster://host, or

  • memory://.

property web

Web server driver to use.

property web_bind

Web network interface binding mask.

The IP network address mask that decides what interfaces the web server will bind to.

By default this will bind to all interfaces.

This option is usually set by faust worker --web-bind, not by passing it as a keyword argument to app.

property web_cors_options

Cross Origin Resource Sharing options.

Enable Cross-Origin Resource Sharing options for all web views in the internal web server.

This should be specified as a dictionary of URLs to ResourceOptions:

app = App(..., web_cors_options={
    'http://foo.example.com': ResourceOptions(
        allow_credentials=True,
        allow_methods='*'k,
    )
})

Individual views may override the CORS options used as arguments to to @app.page and blueprint.route.

property web_enabled

Enable/disable internal web server.

Enable web server and other web components.

This option can also be set using faust worker --without-web.

property web_host

Web server host name.

Hostname used to access this web server, used for generating the canonical_url setting.

This option is usually set by faust worker --web-host, not by passing it as a keyword argument to app.

property web_in_thread

Run the web server in a separate thread.

Use this if you have a large value for stream_buffer_maxsize and want the web server to be responsive when the worker is otherwise busy processing streams.

Note

Running the web server in a separate thread means web views and agents will not share the same event loop.

property web_port

Web server port.

A port number between 1024 and 65535 to use for the web server.

This option is usually set by faust worker --web-port, not by passing it as a keyword argument to app.

property web_ssl_context

Web server SSL configuration.

See credentials.

property web_transport

Network transport used for the web server.

Default is to use TCP, but this setting also enables you to use Unix domainN sockets. To use domain sockets specify an URL including the path to the file you want to create like this:

unix:///tmp/server.sock

This will create a new domain socket available in /tmp/server.sock.

property canonical_url

Node specific canonical URL.

You shouldn’t have to set this manually.

The canonical URL defines how to reach the web server on a running worker node, and is usually set by combining the web_host and web_port settings.

property worker_redirect_stdouts

Redirecting standard outputs.

Enable to have the worker redirect output to sys.stdout and sys.stderr to the Python logging system.

Enabled by default.

property worker_redirect_stdouts_level

Level used when redirecting standard outputs.

The logging level to use when redirect STDOUT/STDERR to logging.

property Agent

Agent class type.

The Agent class to use for agents, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyAgent(faust.Agent):
    ...

app = App(..., Agent=MyAgent)

Example using the string path to a class:

app = App(..., Agent='myproj.agents.Agent')
property ConsumerScheduler

Consumer scheduler class.

A strategy which dictates the priority of topics and partitions for incoming records. The default strategy does first round-robin over topics and then round-robin over partitions.

Example using a class:

class MySchedulingStrategy(DefaultSchedulingStrategy):
    ...

app = App(..., ConsumerScheduler=MySchedulingStrategy)

Example using the string path to a class:

app = App(..., ConsumerScheduler='myproj.MySchedulingStrategy')
property Event

Event class type.

The Event class to use for creating new event objects, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseEvent(faust.Event):
    ...

app = App(..., Event=MyBaseEvent)

Example using the string path to a class:

app = App(..., Event='myproj.events.Event')
property Schema

Schema class type.

The Schema class to use as the default schema type when no schema specified. or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseSchema(faust.Schema):
    ...

app = App(..., Schema=MyBaseSchema)

Example using the string path to a class:

app = App(..., Schema='myproj.schemas.Schema')
property Stream

Stream class type.

The Stream class to use for streams, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseStream(faust.Stream):
    ...

app = App(..., Stream=MyBaseStream)

Example using the string path to a class:

app = App(..., Stream='myproj.streams.Stream')
property Table

Table class type.

The Table class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseTable(faust.Table):
    ...

app = App(..., Table=MyBaseTable)

Example using the string path to a class:

app = App(..., Table='myproj.tables.Table')
property SetTable

SetTable extension table.

The SetTable class to use for table-of-set tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MySetTable(faust.SetTable):
    ...

app = App(..., Table=MySetTable)

Example using the string path to a class:

app = App(..., Table='myproj.tables.MySetTable')
property GlobalTable

GlobalTable class type.

The GlobalTable class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseGlobalTable(faust.GlobalTable):
    ...

app = App(..., GlobalTable=MyBaseGlobalTable)

Example using the string path to a class:

app = App(..., GlobalTable='myproj.tables.GlobalTable')
property SetGlobalTable

SetGlobalTable class type.

The SetGlobalTable class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseSetGlobalTable(faust.SetGlobalTable):
    ...

app = App(..., SetGlobalTable=MyBaseGlobalSetTable)

Example using the string path to a class:

app = App(..., SetGlobalTable='myproj.tables.SetGlobalTable')
property TableManager

Table manager class type.

The TableManager used for managing tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.tables import TableManager

class MyTableManager(TableManager):
    ...

app = App(..., TableManager=MyTableManager)

Example using the string path to a class:

app = App(..., TableManager='myproj.tables.TableManager')
property Serializers

Serializer registry class type.

The Registry class used for serializing/deserializing messages; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.serialiers import Registry

class MyRegistry(Registry):
    ...

app = App(..., Serializers=MyRegistry)

Example using the string path to a class:

app = App(..., Serializers='myproj.serializers.Registry')
property Worker

Worker class type.

The Worker class used for starting a worker for this app; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust

class MyWorker(faust.Worker):
    ...

app = faust.App(..., Worker=Worker)

Example using the string path to a class:

app = faust.App(..., Worker='myproj.workers.Worker')
property PartitionAssignor

Partition assignor class type.

The PartitionAssignor class used for assigning topic partitions to worker instances; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.assignor import PartitionAssignor

class MyPartitionAssignor(PartitionAssignor):
    ...

app = App(..., PartitionAssignor=PartitionAssignor)

Example using the string path to a class:

app = App(..., Worker='myproj.assignor.PartitionAssignor')
property LeaderAssignor

Leader assignor class type.

The LeaderAssignor class used for assigning a master Faust instance for the app; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.assignor import LeaderAssignor

class MyLeaderAssignor(LeaderAssignor):
    ...

app = App(..., LeaderAssignor=LeaderAssignor)

Example using the string path to a class:

app = App(..., Worker='myproj.assignor.LeaderAssignor')
property Router

Router class type.

The Router class used for routing requests to a worker instance having the partition for a specific key (e.g. table key); or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.router import Router

class MyRouter(Router):
    ...

app = App(..., Router=Router)

Example using the string path to a class:

app = App(..., Router='myproj.routers.Router')
property Topic

Topic class type.

The Topic class used for defining new topics; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust

class MyTopic(faust.Topic):
    ...

app = faust.App(..., Topic=MyTopic)

Example using the string path to a class:

app = faust.App(..., Topic='myproj.topics.Topic')
property HttpClient

Http client class type

The aiohttp.client.ClientSession class used as a HTTP client; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust
from aiohttp.client import ClientSession

class HttpClient(ClientSession):
    ...

app = faust.App(..., HttpClient=HttpClient)

Example using the string path to a class:

app = faust.App(..., HttpClient='myproj.http.HttpClient')
property Monitor

Monitor sensor class type.

The Monitor class as the main sensor gathering statistics for the application; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust
from faust.sensors import Monitor

class MyMonitor(Monitor):
    ...

app = faust.App(..., Monitor=MyMonitor)

Example using the string path to a class:

app = faust.App(..., Monitor='myproj.monitors.Monitor')
property stream_ack_cancelled_tasks

Deprecated setting has no effect.

property stream_ack_exceptions

Deprecated setting has no effect.

SETTINGS: ClassVar[SettingIndexMapping] = {'Agent': <faust.types.settings.params._Symbol object>, 'ConsumerScheduler': <faust.types.settings.params._Symbol object>, 'Event': <faust.types.settings.params._Symbol object>, 'GlobalTable': <faust.types.settings.params._Symbol object>, 'HttpClient': <faust.types.settings.params._Symbol object>, 'LeaderAssignor': <faust.types.settings.params._Symbol object>, 'Monitor': <faust.types.settings.params._Symbol object>, 'PartitionAssignor': <faust.types.settings.params._Symbol object>, 'Router': <faust.types.settings.params._Symbol object>, 'Schema': <faust.types.settings.params._Symbol object>, 'Serializers': <faust.types.settings.params._Symbol object>, 'SetGlobalTable': <faust.types.settings.params._Symbol object>, 'SetTable': <faust.types.settings.params._Symbol object>, 'Stream': <faust.types.settings.params._Symbol object>, 'Table': <faust.types.settings.params._Symbol object>, 'TableManager': <faust.types.settings.params._Symbol object>, 'Topic': <faust.types.settings.params._Symbol object>, 'Worker': <faust.types.settings.params._Symbol object>, 'aerospike_retries_on_exception': <faust.types.settings.params.Int object>, 'aerospike_sleep_seconds_between_retries_on_exception': <faust.types.settings.params.Int object>, 'agent_supervisor': <faust.types.settings.params._Symbol object>, 'autodiscover': <faust.types.settings.params.Param object>, 'blocking_timeout': <faust.types.settings.params.Seconds object>, 'broker': <faust.types.settings.params.BrokerList object>, 'broker_api_version': <faust.types.settings.params.Str object>, 'broker_check_crcs': <faust.types.settings.params.Bool object>, 'broker_client_id': <faust.types.settings.params.Str object>, 'broker_commit_every': <faust.types.settings.params.UnsignedInt object>, 'broker_commit_interval': <faust.types.settings.params.Seconds object>, 'broker_commit_livelock_soft_timeout': <faust.types.settings.params.Seconds object>, 'broker_consumer': <faust.types.settings.params.BrokerList object>, 'broker_credentials': <faust.types.settings.params.Credentials object>, 'broker_heartbeat_interval': <faust.types.settings.params.Seconds object>, 'broker_max_poll_interval': <faust.types.settings.params.Seconds object>, 'broker_max_poll_records': <faust.types.settings.params.UnsignedInt object>, 'broker_producer': <faust.types.settings.params.BrokerList object>, 'broker_rebalance_timeout': <faust.types.settings.params.Seconds object>, 'broker_request_timeout': <faust.types.settings.params.Seconds object>, 'broker_session_timeout': <faust.types.settings.params.Seconds object>, 'cache': <faust.types.settings.params.URL object>, 'canonical_url': <faust.types.settings.params.URL object>, 'consumer_api_version': <faust.types.settings.params.Str object>, 'consumer_auto_offset_reset': <faust.types.settings.params.Str object>, 'consumer_connections_max_idle_ms': <faust.types.settings.params.Int object>, 'consumer_group_instance_id': <faust.types.settings.params.Str object>, 'consumer_max_fetch_size': <faust.types.settings.params.UnsignedInt object>, 'consumer_metadata_max_age_ms': <faust.types.settings.params.Int object>, 'crash_app_on_aerospike_exception': <faust.types.settings.params.Bool object>, 'datadir': <faust.types.settings.params.Path object>, 'debug': <faust.types.settings.params.Bool object>, 'env_prefix': <faust.types.settings.params.Str object>, 'id_format': <faust.types.settings.params.Str object>, 'key_serializer': <faust.types.settings.params.Codec object>, 'logging_config': <faust.types.settings.params.Dict object>, 'loghandlers': <faust.types.settings.params.LogHandlers object>, 'origin': <faust.types.settings.params.Str object>, 'processing_guarantee': <faust.types.settings.params.Enum.<locals>.EnumParam object>, 'producer_acks': <faust.types.settings.params.Int object>, 'producer_api_version': <faust.types.settings.params.Str object>, 'producer_compression_type': <faust.types.settings.params.Str object>, 'producer_connections_max_idle_ms': <faust.types.settings.params.Int object>, 'producer_linger': <faust.types.settings.params.Seconds object>, 'producer_linger_ms': <faust.types.settings.params.UnsignedInt object>, 'producer_max_batch_size': <faust.types.settings.params.UnsignedInt object>, 'producer_max_request_size': <faust.types.settings.params.UnsignedInt object>, 'producer_metadata_max_age_ms': <faust.types.settings.params.Int object>, 'producer_partitioner': <faust.types.settings.params._Symbol object>, 'producer_request_timeout': <faust.types.settings.params.Seconds object>, 'producer_threaded': <faust.types.settings.params.Bool object>, 'recovery_consistency_check': <faust.types.settings.params.Bool object>, 'reply_create_topic': <faust.types.settings.params.Bool object>, 'reply_expires': <faust.types.settings.params.Seconds object>, 'reply_to': <faust.types.settings.params.Str object>, 'reply_to_prefix': <faust.types.settings.params.Str object>, 'ssl_context': <faust.types.settings.params.SSLContext object>, 'store': <faust.types.settings.params.URL object>, 'store_check_exists': <faust.types.settings.params.Bool object>, 'stream_ack_cancelled_tasks': <faust.types.settings.params.Bool object>, 'stream_ack_exceptions': <faust.types.settings.params.Bool object>, 'stream_buffer_maxsize': <faust.types.settings.params.UnsignedInt object>, 'stream_processing_timeout': <faust.types.settings.params.Seconds object>, 'stream_publish_on_commit': <faust.types.settings.params.Bool object>, 'stream_recovery_delay': <faust.types.settings.params.Seconds object>, 'stream_wait_empty': <faust.types.settings.params.Bool object>, 'table_cleanup_interval': <faust.types.settings.params.Seconds object>, 'table_key_index_size': <faust.types.settings.params.UnsignedInt object>, 'table_standby_replicas': <faust.types.settings.params.UnsignedInt object>, 'tabledir': <faust.types.settings.params.Path object>, 'timezone': <faust.types.settings.params.Timezone object>, 'topic_allow_declare': <faust.types.settings.params.Bool object>, 'topic_disable_leader': <faust.types.settings.params.Bool object>, 'topic_partitions': <faust.types.settings.params.UnsignedInt object>, 'topic_replication_factor': <faust.types.settings.params.UnsignedInt object>, 'url': <faust.types.settings.params.URL object>, 'value_serializer': <faust.types.settings.params.Codec object>, 'version': <faust.types.settings.params.Int object>, 'web': <faust.types.settings.params.URL object>, 'web_bind': <faust.types.settings.params.Str object>, 'web_cors_options': <faust.types.settings.params.Dict object>, 'web_enabled': <faust.types.settings.params.Bool object>, 'web_host': <faust.types.settings.params.Str object>, 'web_in_thread': <faust.types.settings.params.Bool object>, 'web_port': <faust.types.settings.params.Port object>, 'web_ssl_context': <faust.types.settings.params.SSLContext object>, 'web_transport': <faust.types.settings.params.URL object>, 'worker_redirect_stdouts': <faust.types.settings.params.Bool object>, 'worker_redirect_stdouts_level': <faust.types.settings.params.Severity object>}

Index of all settings by name.

SETTINGS_BY_SECTION: ClassVar[SettingSectionIndexMapping] = defaultdict(<class 'list'>, {<Section: SectionType.COMMON>: [<faust.types.settings.params.Param object>, <faust.types.settings.params.Path object>, <faust.types.settings.params.Path object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Timezone object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.BrokerList object>, <faust.types.settings.params.Credentials object>, <faust.types.settings.params.SSLContext object>, <faust.types.settings.params.Dict object>, <faust.types.settings.params.LogHandlers object>, <faust.types.settings.params.Enum.<locals>.EnumParam object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>], <Section: SectionType.AGENT>: [<faust.types.settings.params._Symbol object>], <Section: SectionType.BROKER>: [<faust.types.settings.params.BrokerList object>, <faust.types.settings.params.BrokerList object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>], <Section: SectionType.CONSUMER>: [<faust.types.settings.params.Str object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params._Symbol object>], <Section: SectionType.SERIALIZATION>: [<faust.types.settings.params.Codec object>, <faust.types.settings.params.Codec object>], <Section: SectionType.PRODUCER>: [<faust.types.settings.params.Int object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.STREAM>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>], <Section: SectionType.RPC>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>], <Section: SectionType.TABLE>: [<faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.TOPIC>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.WEB_SERVER>: [<faust.types.settings.params.URL object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Dict object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Port object>, <faust.types.settings.params.SSLContext object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>], <Section: SectionType.WORKER>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Severity object>], <Section: SectionType.EXTENSION>: [<faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>]})

Index of all sections and the settings in a section.

property producer_linger_ms

Deprecated setting, please use producer_linger instead.

This used to be provided as milliseconds, the new setting uses seconds.

property url

Backward compatibility alias to broker.

faust.HoppingWindow

alias of _PyHoppingWindow

class faust.TumblingWindow(size: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None)[source]

Tumbling window type.

Fixed-size, non-overlapping, gap-less windows.

faust.SlidingWindow

alias of _PySlidingWindow

class faust.Window[source]

Base class for window types.

class faust.Worker(app: ~faust.types.app.AppT, *services: ~mode.types.services.ServiceT, sensors: ~typing.Optional[~typing.Iterable[~faust.types.sensors.SensorT]] = None, debug: bool = False, quiet: bool = False, loglevel: ~typing.Optional[~typing.Union[str, int]] = None, logfile: ~typing.Optional[~typing.Union[str, ~typing.IO]] = None, stdout: ~typing.IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, stderr: ~typing.IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>, blocking_timeout: ~typing.Optional[float] = None, workdir: ~typing.Optional[~typing.Union[~pathlib.Path, str]] = None, console_port: int = 50101, loop: ~typing.Optional[~asyncio.events.AbstractEventLoop] = None, redirect_stdouts: ~typing.Optional[bool] = None, redirect_stdouts_level: ~typing.Optional[~typing.Union[str, int]] = None, logging_config: ~typing.Optional[~typing.Dict] = None, **kwargs: ~typing.Any)[source]

Worker.

See also

This is a subclass of mode.Worker.

Usage:

You can start a worker using:

  1. the faust worker program.

  2. instantiating Worker programmatically and calling execute_from_commandline():

    >>> worker = Worker(app)
    >>> worker.execute_from_commandline()
    
  3. or if you already have an event loop, calling await start, but in that case you are responsible for gracefully shutting down the event loop:

    async def start_worker(worker: Worker) -> None:
        await worker.start()
    
    def manage_loop():
        loop = asyncio.get_event_loop_policy().get_event_loop()
        worker = Worker(app, loop=loop)
        try:
            loop.run_until_complete(start_worker(worker)
        finally:
            worker.stop_and_shutdown_loop()
    
Parameters:
  • app (AppT) – The Faust app to start.

  • *services – Services to start with worker. This includes application instances to start.

  • sensors (Iterable[SensorT]) – List of sensors to include.

  • debug (bool) – Enables debugging mode [disabled by default].

  • quiet (bool) – Do not output anything to console [disabled by default].

  • loglevel (Union[str, int]) – Level to use for logging, can be string (one of: CRIT|ERROR|WARN|INFO|DEBUG), or integer.

  • logfile (Union[str, IO]) – Name of file or a stream to log to.

  • stdout (IO) – Standard out stream.

  • stderr (IO) – Standard err stream.

  • blocking_timeout (float) – When debug is enabled this sets the timeout for detecting that the event loop is blocked.

  • workdir (Union[str, Path]) – Custom working directory for the process that the worker will change into when started. This working directory change is permanent for the process, or until something else changes the working directory again.

  • loop (asyncio.AbstractEventLoop) – Custom event loop object.

logger: logging.Logger = <Logger faust.worker (WARNING)>
app: AppT

The Faust app started by this worker.

sensors: Set[SensorT]

Additional sensors to add to the Faust app.

workdir: Path

Current working directory. Note that if passed as an argument to Worker, the worker will change to this directory when started.

spinner: Optional[Spinner]

Class that displays a terminal progress spinner (see https://pypi.org/project/progress/).

async on_start() None[source]

Signal called every time the worker starts.

Return type:

None

async maybe_start_blockdetection() None[source]

Start blocking detector service if enabled.

Return type:

None

async on_startup_finished() None[source]

Signal called when worker has started.

Return type:

None

on_init_dependencies() Iterable[ServiceT][source]

Return service dependencies that must start with the worker.

Return type:

_GenericAlias[ServiceT]

async on_first_start() None[source]

Signal called the first time the worker starts.

First time, means this callback is not called if the worker is restarted by an exception being raised.

Return type:

None

change_workdir(path: Path) None[source]

Change the current working directory (CWD).

Return type:

None

autodiscover() None[source]

Autodiscover modules and files to find @agent decorators, etc.

Return type:

None

async on_execute() None[source]

Signal called when the worker is about to start.

Return type:

None

on_worker_shutdown() None[source]

Signal called before the worker is shutting down.

Return type:

None

on_setup_root_logger(logger: Logger, level: int) None[source]

Signal called when the root logger is being configured.

Return type:

None

faust.uuid() str[source]

Generate random UUID string.

Shortcut to str(uuid4()).

Return type:

str

class faust.Service(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]

An asyncio service that can be started/stopped/restarted.

Keyword Arguments:
abstract: ClassVar[bool] = False

Set to True if this service class is abstract-only, meaning it will only be used as a base class.

class Diag(service: ServiceT)

Service diagnostics.

This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:

```python DIAG_COMMITTING = ‘committing’

class Consumer(Service):

@Service.task async def _background_commit(self) -> None:

while not self.should_stop:

await self.sleep(30.0) self.diag.set_flag(DIAG_COMMITTING) try:

await self._consumer.commit()

finally:

self.diag.unset_flag(DIAG_COMMITTING)

```

The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:

```python @Service.timer(30.0) async def _background_commit(self) -> None:

await self.commit()

@Service.transitions_with(DIAG_COMMITTING) async def commit(self) -> None:

await self._consumer.commit()

```

set_flag(flag: str) None
Return type:

None

unset_flag(flag: str) None
Return type:

None

wait_for_shutdown = False

Set to True if .stop must wait for the shutdown flag to be set.

shutdown_timeout: float = 60.0

Time to wait for shutdown flag set before we give up.

restart_count: int = 0

Current number of times this service instance has been restarted.

mundane_level = 'info'

The log level for mundane info such as starting, stopping, etc. Set this to "debug" for less information.

classmethod from_awaitable(coro: Awaitable, *, name: Optional[str] = None, **kwargs: Any) ServiceT[source]
Return type:

ServiceT

classmethod task(fun: Callable[[Any], Awaitable[None]]) ServiceTask[source]

Decorate function to be used as background task.

Example:

```python class S(Service):

@Service.task async def background_task(self):

while not self.should_stop:

await self.sleep(1.0) print(‘Waking up’)

```

Return type:

ServiceTask

classmethod timer(interval: Union[timedelta, float, str], *, exec_first: bool = False, name: Optional[str] = None, max_drift_correction: float = 0.1) Callable[[Callable], ServiceTask][source]

Background timer executing every n seconds.

```python class S(Service):

@Service.timer(1.0) async def background_timer(self):

print(‘Waking up’)

```

Return type:

_CallableGenericAlias[_CallableType, ServiceTask]

classmethod crontab(cron_format: str, *, timezone: Optional[tzinfo] = None) Callable[[Callable], ServiceTask][source]

Background timer executing periodic task based on Crontab description.

Example:

```python class S(Service):

@Service.crontab(cron_format=’30 18 * * *’,

timezone=pytz.timezone(‘US/Pacific’))

async def every_6_30_pm_pacific(self):

print(‘IT IS 6:30pm’)

@Service.crontab(cron_format=’30 18 * * *’) async def every_6_30_pm(self):

print(‘6:30pm UTC’)

```

Return type:

_CallableGenericAlias[_CallableType, ServiceTask]

classmethod transitions_to(flag: str) Callable[source]

Decorate function to set and reset diagnostic flag.

Return type:

_CallableType

async transition_with(flag: str, fut: Awaitable, *args: Any, **kwargs: Any) Any[source]
Return type:

Any

add_dependency(service: ServiceT) ServiceT[source]

Add dependency to other service.

The service will be started/stopped with this service.

Return type:

ServiceT

async add_runtime_dependency(service: ServiceT) ServiceT[source]
Return type:

ServiceT

async remove_dependency(service: ServiceT) ServiceT[source]

Stop and remove dependency of this service.

Return type:

ServiceT

async add_async_context(context: AsyncContextManager) Any[source]
Return type:

Any

add_context(context: ContextManager) Any[source]
Return type:

Any

add_future(coro: Awaitable) Future[source]

Add relationship to asyncio.Future.

The future will be joined when this service is stopped.

Return type:

Future

tracebacks() Mapping[str, str][source]
Return type:

_GenericAlias[str, str]

human_tracebacks() str[source]
Return type:

str

on_init() None[source]
Return type:

None

on_init_dependencies() Iterable[ServiceT][source]

Return list of service dependencies for this service.

Return type:

_GenericAlias[ServiceT]

async join_services(services: Sequence[ServiceT]) None[source]
Return type:

None

async sleep(n: Union[timedelta, float, str]) None[source]

Sleep for n seconds, or until service stopped.

Return type:

None

async wait_for_stopped(*coros: Union[Future, Coroutine[Any, None, Any], Awaitable, Event], timeout: Optional[Union[timedelta, float, str]] = None) bool[source]
Return type:

bool

async wait(*coros: Union[Future, Coroutine[Any, None, Any], Awaitable, Event], timeout: Optional[Union[timedelta, float, str]] = None) WaitResult[source]

Wait for coroutines to complete, or until the service stops.

Return type:

WaitResult

async wait_many(coros: Iterable[Union[Future, Coroutine[Any, None, Any], Awaitable, Event]], *, timeout: Optional[Union[timedelta, float, str]] = None) WaitResult[source]
Return type:

WaitResult

async wait_first(*coros: Union[Future, Coroutine[Any, None, Any], Awaitable, Event], timeout: Optional[Union[timedelta, float, str]] = None) WaitResults[source]
Return type:

WaitResults

async start() None[source]
Return type:

None

async maybe_start() bool[source]

Start the service, if it has not already been started.

Return type:

bool

async crash(reason: BaseException) None[source]

Crash the service and all child services.

Return type:

None

async stop() None[source]

Stop the service.

Return type:

None

async restart() None[source]

Restart this service.

Return type:

None

service_reset() None[source]
Return type:

None

async wait_until_stopped() None[source]

Wait until the service is signalled to stop.

Return type:

None

set_shutdown() None[source]

Set the shutdown signal.

Notes

If wait_for_shutdown is set, stopping the service will wait for this flag to be set.

Return type:

None

async itertimer(interval: ~typing.Union[~datetime.timedelta, float, str], *, max_drift_correction: float = 0.1, sleep: ~typing.Optional[~typing.Callable[[...], ~typing.Awaitable]] = None, clock: ~typing.Callable[[], float] = <built-in function perf_counter>, name: str = '') AsyncIterator[float][source]

Sleep interval seconds for every iteration.

This is an async iterator that takes advantage of ~mode.timers.Timer to monitor drift and timer overlap.

Uses Service.sleep so exits fast when the service is stopped.

Note

Will sleep the full interval seconds before returning from first iteration.

Examples:

```python async for sleep_time in self.itertimer(1.0):

print(‘another second passed, just woke up…’) await perform_some_http_request()

```

Return type:

_GenericAlias[float]

property started: bool

Return True if the service was started. :rtype: bool

property crashed: bool
Return type:

bool

property should_stop: bool

Return True if the service must stop. :rtype: bool

property state: str

Service state - as a human readable string. :rtype: str

property label: str

Label used for graphs. :rtype: str

property shortlabel: str

Label used for logging. :rtype: str

property beacon: NodeT

Beacon used to track services in a dependency graph. :rtype: NodeT

property crash_reason: Optional[BaseException]
Return type:

_UnionGenericAlias[BaseException, None]

logger: logging.Logger = <Logger mode.services (WARNING)>
class faust.ServiceT(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]

Abstract type for an asynchronous service that can be started/stopped.

See also

mode.Service.

Diag: Type[DiagT]
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
shutdown_timeout: float
wait_for_shutdown = False
restart_count: int = 0
supervisor: Optional[SupervisorStrategyT] = None
abstract add_dependency(service: ServiceT) ServiceT[source]
Return type:

ServiceT

abstract async add_runtime_dependency(service: ServiceT) ServiceT[source]
Return type:

ServiceT

abstract async add_async_context(context: AsyncContextManager) Any[source]
Return type:

Any

abstract add_context(context: ContextManager) Any[source]
Return type:

Any

abstract async start() None[source]
Return type:

None

abstract async maybe_start() bool[source]
Return type:

bool

abstract async crash(reason: BaseException) None[source]
Return type:

None

abstract async stop() None[source]
Return type:

None

abstract service_reset() None[source]
Return type:

None

abstract async restart() None[source]
Return type:

None

abstract async wait_until_stopped() None[source]
Return type:

None

abstract set_shutdown() None[source]
Return type:

None

abstract property started: bool
Return type:

bool

abstract property crashed: bool
Return type:

bool

abstract property should_stop: bool
Return type:

bool

abstract property state: str
Return type:

str

abstract property label: str
Return type:

str

abstract property shortlabel: str
Return type:

str

property beacon: NodeT
Return type:

NodeT

abstract property loop: AbstractEventLoop
Return type:

AbstractEventLoop

abstract property crash_reason: Optional[BaseException]
Return type:

_UnionGenericAlias[BaseException, None]