faust.types.settings

class faust.types.settings.Settings(*args: Any, **kwargs: Any)[source]
NODE_HOSTNAME: ClassVar[str] = 'fv-az2021-651'
DEFAULT_BROKER_URL: ClassVar[str] = 'kafka://localhost:9092'
env: Mapping[str, str]

Environment. Defaults to os.environ.

on_init(id: str, **kwargs: Any) None[source]
Return type:

None

getenv(env_name: str) Any[source]
Return type:

Any

relative_to_appdir(path: Path) Path[source]

Prepare app directory path.

If path is absolute the path is returned as-is, but if path is relative it will be assumed to belong under the app directory.

Return type:

Path

data_directory_for_version(version: int) Path[source]

Return the directory path for data belonging to specific version.

Return type:

Path

find_old_versiondirs() Iterable[Path][source]
Return type:

_GenericAlias[Path]

property name: str
Return type:

str

property id: str
Return type:

str

property appdir: Path
Return type:

Path

property MY_SETTING

My custom setting.

To contribute new settings you only have to define a new setting decorated attribute here.

Look at the other settings for examples.

Remember that once you’ve added the setting you must also render the configuration reference:

$ make configref
property autodiscover

Automatic discovery of agents, tasks, timers, views and commands.

Faust has an API to add different asyncio services and other user extensions, such as “Agents”, HTTP web views, command-line commands, and timers to your Faust workers. These can be defined in any module, so to discover them at startup, the worker needs to traverse packages looking for them.

Warning

The autodiscovery functionality uses the https://pypi.org/project/Venusian/ library to scan wanted packages for @app.agent, @app.page, @app.command, @app.task and @app.timer decorators, but to do so, it’s required to traverse the package path and import every module in it.

Importing random modules like this can be dangerous so make sure you follow Python programming best practices. Do not start threads; perform network I/O; do test monkey-patching for mocks or similar, as a side effect of importing a module. If you encounter a case such as this then please find a way to perform your action in a lazy manner.

Warning

If the above warning is something you cannot fix, or if it’s out of your control, then please set autodiscover=False and make sure the worker imports all modules where your decorators are defined.

The value for this argument can be:

bool

If App(autodiscover=True) is set, the autodiscovery will scan the package name described in the origin attribute.

The origin attribute is automatically set when you start a worker using the faust command line program, for example:

faust -A example.simple worker

The -A, option specifies the app, but you can also create a shortcut entry point by calling app.main():

if __name__ == '__main__':
    app.main()

Then you can start the faust program by executing for example python myscript.py worker --loglevel=INFO, and it will use the correct application.

Sequence[str]

The argument can also be a list of packages to scan:

app = App(..., autodiscover=['proj_orders', 'proj_accounts'])
Callable[[], Sequence[str]]

The argument can also be a function returning a list of packages to scan:

def get_all_packages_to_scan():
    return ['proj_orders', 'proj_accounts']

app = App(..., autodiscover=get_all_packages_to_scan)
False

If everything you need is in a self-contained module, or you import the stuff you need manually, just set autodiscover to False and don’t worry about it :-)

Django

When using https://pypi.org/project/Django/ and the DJANGO_SETTINGS_MODULE environment variable is set, the Faust app will scan all packages found in the INSTALLED_APPS setting.

If you’re using Django you can use this to scan for agents/pages/commands in all packages defined in INSTALLED_APPS.

Faust will automatically detect that you’re using Django and do the right thing if you do:

app = App(..., autodiscover=True)

It will find agents and other decorators in all of the reusable Django applications. If you want to manually control what packages are traversed, then provide a list:

app = App(..., autodiscover=['package1', 'package2'])

or if you want exactly None packages to be traversed, then provide a False:

app = App(.., autodiscover=False)

which is the default, so you can simply omit the argument.

Tip

For manual control over autodiscovery, you can also call the app.discover() method manually.

property datadir

Application data directory.

The directory in which this instance stores the data used by local tables, etc.

See also

  • The data directory can also be set using the faust --datadir option, from the command-line, so there is usually no reason to provide a default value when creating the app.

property tabledir

Application table data directory.

The directory in which this instance stores local table data. Usually you will want to configure the datadir setting, but if you want to store tables separately you can configure this one.

If the path provided is relative (it has no leading slash), then the path will be considered to be relative to the datadir setting.

property debug

Use in development to expose sensor information endpoint.

Tip

If you want to enable the sensor statistics endpoint in production, without enabling the debug setting, you can do so by adding the following code:

app.web.blueprints.add(
    '/stats/', 'faust.web.apps.stats:blueprint')
property env_prefix

Environment variable prefix.

When configuring Faust by environent variables, this adds a common prefix to all Faust environment value names.

property id_format

Application ID format template.

The format string used to generate the final id value by combining it with the version parameter.

property origin

The reverse path used to find the app.

For example if the app is located in:

from myproj.app import app

Then the origin should be "myproj.app".

The faust worker program will try to automatically set the origin, but if you are having problems with auto generated names then you can set origin manually.

property timezone

Project timezone.

The timezone used for date-related functionality such as cronjobs.

property version

App version.

Version of the app, that when changed will create a new isolated instance of the application. The first version is 1, the second version is 2, and so on.

Source topics will not be affected by a version change.

Faust applications will use two kinds of topics: source topics, and internally managed topics. The source topics are declared by the producer, and we do not have the opportunity to modify any configuration settings, like number of partitions for a source topic; we may only consume from them. To mark a topic as internal, use: app.topic(..., internal=True).

property agent_supervisor

Default agent supervisor type.

An agent may start multiple instances (actors) when the concurrency setting is higher than one (e.g. @app.agent(concurrency=2)).

Multiple instances of the same agent are considered to be in the same supervisor group.

The default supervisor is the mode.OneForOneSupervisor: if an instance in the group crashes, we restart that instance only.

These are the supervisors supported:

property blocking_timeout

Blocking timeout (in seconds).

When specified the worker will start a periodic signal based timer that only triggers when the loop has been blocked for a time exceeding this timeout.

This is the most safe way to detect blocking, but could have adverse effects on libraries that do not automatically retry interrupted system calls.

Python itself does retry all interrupted system calls since version 3.5 (see PEP 475), but this might not be the case with C extensions added to the worker by the user.

The blocking detector is a background thread that periodically wakes up to either arm a timer, or cancel an already armed timer. In pseudocode:

while True:
    # cancel previous alarm and arm new alarm
    signal.signal(signal.SIGALRM, on_alarm)
    signal.setitimer(signal.ITIMER_REAL, blocking_timeout)
    # sleep to wakeup just before the timeout
    await asyncio.sleep(blocking_timeout * 0.96)

def on_alarm(signum, frame):
    logger.warning('Blocking detected: ...')

If the sleep does not wake up in time the alarm signal will be sent to the process and a traceback will be logged.

property broker

Broker URL, or a list of alternative broker URLs.

Faust needs the URL of a “transport” to send and receive messages.

Currently, the only supported production transport is kafka://. This uses the https://pypi.org/project/aiokafka/ client under the hood, for consuming and producing messages.

You can specify multiple hosts at the same time by separating them using the semi-comma:

kafka://kafka1.example.com:9092;kafka2.example.com:9092

Which in actual code looks like this:

BROKERS = 'kafka://kafka1.example.com:9092;kafka2.example.com:9092'
app = faust.App(
    'id',
    broker=BROKERS,
)

You can also pass a list of URLs:

app = faust.App(
    'id',
    broker=['kafka://kafka1.example.com:9092',
            'kafka://kafka2.example.com:9092'],
)

See also

You can configure the transport used for consuming and producing separately, by setting the broker_consumer and broker_producer settings.

This setting is used as the default.

Available Transports

property broker_consumer

Consumer broker URL.

You can use this setting to configure the transport used for producing and consuming separately.

If not set the value found in broker will be used.

property broker_producer

Producer broker URL.

You can use this setting to configure the transport used for producing and consuming separately.

If not set the value found in broker will be used.

property broker_api_version

Broker API version,.

This setting is also the default for consumer_api_version, and producer_api_version.

Negotiate producer protocol version.

The default value - “auto” means use the latest version supported by both client and server.

Any other version set means you are requesting a specific version of the protocol.

Example Kafka uses:

Disable sending headers for all messages produced

Kafka headers support was added in Kafka 0.11, so you can specify broker_api_version="0.10" to remove the headers from messages.

property broker_check_crcs

Broker CRC check.

Automatically check the CRC32 of the records consumed.

property broker_client_id

Broker client ID.

There is rarely any reason to configure this setting.

The client id is used to identify the software used, and is not usually configured by the user.

property broker_commit_every

Broker commit message frequency.

Commit offset every n messages.

See also broker_commit_interval, which is how frequently we commit on a timer when there are few messages being received.

property broker_commit_interval

Broker commit time frequency.

How often we commit messages that have been fully processed (acked).

property broker_commit_livelock_soft_timeout

Commit livelock timeout.

How long time it takes before we warn that the Kafka commit offset has not advanced (only when processing messages).

property broker_credentials

Broker authentication mechanism.

Specify the authentication mechanism to use when connecting to the broker.

The default is to not use any authentication.

SASL Authentication

You can enable SASL authentication via plain text:

app = faust.App(
    broker_credentials=faust.SASLCredentials(
        username='x',
        password='y',
    ))

Warning

Do not use literal strings when specifying passwords in production, as they can remain visible in stack traces.

Instead the best practice is to get the password from a configuration file, or from the environment:

BROKER_USERNAME = os.environ.get('BROKER_USERNAME')
BROKER_PASSWORD = os.environ.get('BROKER_PASSWORD')

app = faust.App(
    broker_credentials=faust.SASLCredentials(
        username=BROKER_USERNAME,
        password=BROKER_PASSWORD,
    ))
GSSAPI Authentication

GSSAPI authentication over plain text:

app = faust.App(
    broker_credentials=faust.GSSAPICredentials(
        kerberos_service_name='faust',
        kerberos_domain_name='example.com',
    ),
)

GSSAPI authentication over SSL:

import ssl
ssl_context = ssl.create_default_context(
    purpose=ssl.Purpose.SERVER_AUTH, cafile='ca.pem')
ssl_context.load_cert_chain(
    'client.cert', keyfile='client.key')

app = faust.App(
    broker_credentials=faust.GSSAPICredentials(
        kerberos_service_name='faust',
        kerberos_domain_name='example.com',
        ssl_context=ssl_context,
    ),
)
SSL Authentication

Provide an SSL context for the Kafka broker connections.

This allows Faust to use a secure SSL/TLS connection for the Kafka connections and enabling certificate-based authentication.

import ssl

ssl_context = ssl.create_default_context(
    purpose=ssl.Purpose.SERVER_AUTH, cafile='ca.pem')
ssl_context.load_cert_chain(
    'client.cert', keyfile='client.key')
app = faust.App(..., broker_credentials=ssl_context)
property broker_heartbeat_interval

Broker heartbeat interval.

How often we send heartbeats to the broker, and also how often we expect to receive heartbeats from the broker.

If any of these time out, you should increase this setting.

property broker_max_poll_interval

Broker max poll interval.

The maximum allowed time (in seconds) between calls to consume messages If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout.

See KIP-62 for technical details.

property broker_max_poll_records

Broker max poll records.

The maximum number of records returned in a single call to poll(). If you find that your application needs more time to process messages you may want to adjust broker_max_poll_records to tune the number of records that must be handled on every loop iteration.

property broker_rebalance_timeout

Broker rebalance timeout.

How long to wait for a node to finish rebalancing before the broker will consider it dysfunctional and remove it from the cluster.

Increase this if you experience the cluster being in a state of constantly rebalancing, but make sure you also increase the broker_heartbeat_interval at the same time.

Note

The session timeout must not be greater than the broker_request_timeout.

property broker_request_timeout

Kafka client request timeout.

Note

The request timeout must not be less than the broker_session_timeout.

property broker_session_timeout

Broker session timeout.

How long to wait for a node to finish rebalancing before the broker will consider it dysfunctional and remove it from the cluster.

Increase this if you experience the cluster being in a state of constantly rebalancing, but make sure you also increase the broker_heartbeat_interval at the same time.

Note

The session timeout must not be greater than the broker_request_timeout.

property ssl_context

SSL configuration.

See credentials.

property consumer_api_version

Consumer API version.

Configures the broker API version to use for consumers. See broker_api_version for more information.

property consumer_max_fetch_size

Consumer max fetch size.

The maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size.

Note: This is PER PARTITION, so a limit of 1Mb when your workers consume from 10 topics having 100 partitions each, means a fetch request can be up to a gigabyte (10 * 100 * 1Mb), This limit being too generous may cause rebalancing issues: if the amount of time required to flush pending data stuck in socket buffers exceed the rebalancing timeout.

You must keep this limit low enough to account for many partitions being assigned to a single node.

property consumer_auto_offset_reset

Consumer auto offset reset.

Where the consumer should start reading messages from when there is no initial offset, or the stored offset no longer exists, e.g. when starting a new consumer for the first time.

Options include ‘earliest’, ‘latest’, ‘none’.

property consumer_group_instance_id

Consumer group instance id.

The group_instance_id for static partition assignment.

If not set, default assignment strategy is used. Otherwise, each consumer instance has to have a unique id.

property consumer_metadata_max_age_ms

Consumer metadata max age milliseconds

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

Default: 300000

property consumer_connections_max_idle_ms

Consumer connections max idle milliseconds.

Close idle connections after the number of milliseconds specified by this config.

Default: 540000 (9 minutes).

property key_serializer

Default key serializer.

Serializer used for keys by default when no serializer is specified, or a model is not being used.

This can be the name of a serializer/codec, or an actual faust.serializers.codecs.Codec instance.

See also

  • The Codecs section in the model guide – for more information about codecs.

property value_serializer

Default value serializer.

Serializer used for values by default when no serializer is specified, or a model is not being used.

This can be string, the name of a serializer/codec, or an actual faust.serializers.codecs.Codec instance.

See also

  • The Codecs section in the model guide – for more information about codecs.

property logging_config

Logging dictionary configuration.

Optional dictionary for logging configuration, as supported by logging.config.dictConfig().

property loghandlers

List of custom logging handlers.

Specify a list of custom log handlers to use in worker instances.

property producer_acks

Producer Acks.

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:

  • 0: Producer will not wait for any acknowledgment from

    the server at all. The message will immediately be considered sent (Not recommended).

  • 1: The broker leader will write the record to its local

    log but will respond without awaiting full acknowledgment from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

  • -1: The broker leader will wait for the full set of in-sync

    replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

property producer_api_version

Producer API version.

Configures the broker API version to use for producers. See broker_api_version for more information.

property producer_compression_type

Producer compression type.

The compression type for all data generated by the producer. Valid values are gzip, snappy, lz4, or None.

property producer_linger

Producer batch linger configuration.

Minimum time to batch before sending out messages from the producer.

Should rarely have to change this.

property producer_max_batch_size

Producer max batch size.

Max size of each producer batch, in bytes.

property producer_max_request_size

Producer maximum request size.

Maximum size of a request in bytes in the producer.

Should rarely have to change this.

property producer_partitioner

Producer partitioning strategy.

The Kafka producer can be configured with a custom partitioner to change how keys are partitioned when producing to topics.

The default partitioner for Kafka is implemented as follows, and can be used as a template for your own partitioner:

import random
from typing import List
from kafka.partitioner.hashed import murmur2

def partition(key: bytes,
            all_partitions: List[int],
            available: List[int]) -> int:
    '''Default partitioner.

    Hashes key to partition using murmur2 hashing
    (from java client) If key is None, selects partition
    randomly from available, or from all partitions if none
    are currently available

    Arguments:
        key: partitioning key
        all_partitions: list of all partitions sorted by
                        partition ID.
        available: list of available partitions
                   in no particular order
    Returns:
        int: one of the values from ``all_partitions``
             or ``available``.
    '''
    if key is None:
        source = available if available else all_paritions
        return random.choice(source)
    index: int = murmur2(key)
    index &= 0x7fffffff
    index %= len(all_partitions)
    return all_partitions[index]
property producer_request_timeout

Producer request timeout.

Timeout for producer operations. This is set high by default, as this is also the time when producer batches expire and will no longer be retried.

property producer_threaded

Thread separate producer for send_soon.

If True, spin up a different producer in a different thread to be used for messages buffered up for producing via send_soon function.

property producer_metadata_max_age_ms

Producer metadata max age milliseconds

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

Default: 300000

property producer_connections_max_idle_ms

Producer connections max idle milliseconds.

Close idle connections after the number of milliseconds specified by this config.

Default: 540000 (9 minutes).

property recovery_consistency_check

Check Kafka and local offsets for consistency.

If True, assert that Kafka highwater offsets >= local offset in the rocksdb state storee

property store_check_exists

Execute exists on the underlying store.

If True, executes exists on the underlying store. If False client has to catch KeyError

property crash_app_on_aerospike_exception

Crashes the app on an aerospike Exceptions.

If True, crashes the app and prevents the commit offset on progressing. If False client has to catch the Error and implement a dead letter queue

property aerospike_retries_on_exception

Number of retries to aerospike on a runtime error from the aerospike client.

Set this to the number of retries using the aerospike client on a runtime Exception thrown by the client

property aerospike_sleep_seconds_between_retries_on_exception

Seconds to sleep between retries to aerospike on a runtime error from the aerospike client.

Set this to the sleep in seconds between retries using the aerospike client on a runtime Exception thrown by the client

property reply_create_topic

Automatically create reply topics.

Set this to True if you plan on using the RPC with agents.

This will create the internal topic used for RPC replies on that instance at startup.

property reply_expires

RPC reply expiry time in seconds.

The expiry time (in seconds float, or timedelta), for how long replies will stay in the instances local reply topic before being removed.

property reply_to

Reply to address.

The name of the reply topic used by this instance. If not set one will be automatically generated when the app is created.

property reply_to_prefix

Reply address topic name prefix.

The prefix used when generating reply topic names.

property processing_guarantee

The processing guarantee that should be used.

Possible values are “at_least_once” (default) and “exactly_once”.

Note that if exactly-once processing is enabled consumers are configured with isolation.level="read_committed" and producers are configured with retries=Integer.MAX_VALUE and enable.idempotence=true per default.

Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production. For development you can change this, by adjusting broker setting transaction.state.log.replication.factor to the number of brokers you want to use.

property stream_buffer_maxsize

Stream buffer maximum size.

This setting control back pressure to streams and agents reading from streams.

If set to 4096 (default) this means that an agent can only keep at most 4096 unprocessed items in the stream buffer.

Essentially this will limit the number of messages a stream can “prefetch”.

Higher numbers gives better throughput, but do note that if your agent sends messages or update tables (which sends changelog messages).

This means that if the buffer size is large, the broker_commit_interval or broker_commit_every settings must be set to commit frequently, avoiding back pressure from building up.

A buffer size of 131_072 may let you process over 30,000 events a second as a baseline, but be careful with a buffer size that large when you also send messages or update tables.

property stream_processing_timeout

Stream processing timeout.

Timeout (in seconds) for processing events in the stream. If processing of a single event exceeds this time we log an error, but do not stop processing.

If you are seeing a warning like this you should either

  1. increase this timeout to allow agents to spend more time

    on a single event, or

  2. add a timeout to the operation in the agent, so stream processing

    always completes before the timeout.

The latter is preferred for network operations such as web requests. If a network service you depend on is temporarily offline you should consider doing retries (send to separate topic):

main_topic = app.topic('main')
deadletter_topic = app.topic('main_deadletter')

async def send_request(value, timeout: Optional[float] = None) -> None:
    await app.http_client.get('http://foo.com', timeout=timeout)

@app.agent(main_topic)
async def main(stream):
    async for value in stream:
    try:
        await send_request(value, timeout=5)
    except asyncio.TimeoutError:
        await deadletter_topic.send(value)

@app.agent(deadletter_topic)
    async def main_deadletter(stream):
        async for value in stream:
        # wait for 30 seconds before retrying.
        await stream.sleep(30)
        await send_request(value)
property stream_publish_on_commit

Stream delay producing until commit time.

If enabled we buffer up sending messages until the source topic offset related to that processing is committed. This means when we do commit, we may have buffered up a LOT of messages so commit needs to happen frequently (make sure to decrease broker_commit_every).

property stream_recovery_delay

Stream recovery delayl

Number of seconds to sleep before continuing after rebalance. We wait for a bit to allow for more nodes to join/leave before starting recovery tables and then processing streams. This to minimize the chance of errors rebalancing loops.

property stream_wait_empty

Stream wait empty.

This setting controls whether the worker should wait for the currently processing task in an agent to complete before rebalancing or shutting down.

On rebalance/shut down we clear the stream buffers. Those events will be reprocessed after the rebalance anyway, but we may have already started processing one event in every agent, and if we rebalance we will process that event again.

By default we will wait for the currently active tasks, but if your streams are idempotent you can disable it using this setting.

property store

Table storage backend URL.

The backend used for table storage.

Tables are stored in-memory by default, but you should not use the memory:// store in production.

In production, a persistent table store, such as rocksdb:// is preferred.

property table_cleanup_interval

Table cleanup interval.

How often we cleanup tables to remove expired entries.

property table_key_index_size

Table key index size.

Tables keep a cache of key to partition number to speed up table lookups.

This setting configures the maximum size of that cache.

property table_standby_replicas

Table standby replicas.

The number of standby replicas for each table.

property topic_allow_declare

Allow creating new topics.

This setting disables the creation of internal topics.

Faust will only create topics that it considers to be fully owned and managed, such as intermediate repartition topics, table changelog topics etc.

Some Kafka managers does not allow services to create topics, in that case you should set this to False.

property topic_disable_leader

Disable leader election topic.

This setting disables the creation of the leader election topic.

If you’re not using the on_leader=True argument to task/timer/etc., decorators then use this setting to disable creation of the topic.

property topic_partitions

Topic partitions.

Default number of partitions for new topics.

Note

This defines the maximum number of workers we could distribute the workload of the application (also sometimes referred as the sharding factor of the application).

property topic_replication_factor

Topic replication factor.

The default replication factor for topics created by the application.

Note

Generally this should be the same as the configured replication factor for your Kafka cluster.

property cache

Cache backend URL.

Optional backend used for Memcached-style caching. URL can be:

  • redis://host

  • rediscluster://host, or

  • memory://.

property web

Web server driver to use.

property web_bind

Web network interface binding mask.

The IP network address mask that decides what interfaces the web server will bind to.

By default this will bind to all interfaces.

This option is usually set by faust worker --web-bind, not by passing it as a keyword argument to app.

property web_cors_options

Cross Origin Resource Sharing options.

Enable Cross-Origin Resource Sharing options for all web views in the internal web server.

This should be specified as a dictionary of URLs to ResourceOptions:

app = App(..., web_cors_options={
    'http://foo.example.com': ResourceOptions(
        allow_credentials=True,
        allow_methods='*'k,
    )
})

Individual views may override the CORS options used as arguments to to @app.page and blueprint.route.

property web_enabled

Enable/disable internal web server.

Enable web server and other web components.

This option can also be set using faust worker --without-web.

property web_host

Web server host name.

Hostname used to access this web server, used for generating the canonical_url setting.

This option is usually set by faust worker --web-host, not by passing it as a keyword argument to app.

property web_in_thread

Run the web server in a separate thread.

Use this if you have a large value for stream_buffer_maxsize and want the web server to be responsive when the worker is otherwise busy processing streams.

Note

Running the web server in a separate thread means web views and agents will not share the same event loop.

property web_port

Web server port.

A port number between 1024 and 65535 to use for the web server.

This option is usually set by faust worker --web-port, not by passing it as a keyword argument to app.

property web_ssl_context

Web server SSL configuration.

See credentials.

property web_transport

Network transport used for the web server.

Default is to use TCP, but this setting also enables you to use Unix domainN sockets. To use domain sockets specify an URL including the path to the file you want to create like this:

unix:///tmp/server.sock

This will create a new domain socket available in /tmp/server.sock.

property canonical_url

Node specific canonical URL.

You shouldn’t have to set this manually.

The canonical URL defines how to reach the web server on a running worker node, and is usually set by combining the web_host and web_port settings.

property worker_redirect_stdouts

Redirecting standard outputs.

Enable to have the worker redirect output to sys.stdout and sys.stderr to the Python logging system.

Enabled by default.

property worker_redirect_stdouts_level

Level used when redirecting standard outputs.

The logging level to use when redirect STDOUT/STDERR to logging.

property Agent

Agent class type.

The Agent class to use for agents, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyAgent(faust.Agent):
    ...

app = App(..., Agent=MyAgent)

Example using the string path to a class:

app = App(..., Agent='myproj.agents.Agent')
property ConsumerScheduler

Consumer scheduler class.

A strategy which dictates the priority of topics and partitions for incoming records. The default strategy does first round-robin over topics and then round-robin over partitions.

Example using a class:

class MySchedulingStrategy(DefaultSchedulingStrategy):
    ...

app = App(..., ConsumerScheduler=MySchedulingStrategy)

Example using the string path to a class:

app = App(..., ConsumerScheduler='myproj.MySchedulingStrategy')
property Event

Event class type.

The Event class to use for creating new event objects, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseEvent(faust.Event):
    ...

app = App(..., Event=MyBaseEvent)

Example using the string path to a class:

app = App(..., Event='myproj.events.Event')
property Schema

Schema class type.

The Schema class to use as the default schema type when no schema specified. or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseSchema(faust.Schema):
    ...

app = App(..., Schema=MyBaseSchema)

Example using the string path to a class:

app = App(..., Schema='myproj.schemas.Schema')
property Stream

Stream class type.

The Stream class to use for streams, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseStream(faust.Stream):
    ...

app = App(..., Stream=MyBaseStream)

Example using the string path to a class:

app = App(..., Stream='myproj.streams.Stream')
property Table

Table class type.

The Table class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseTable(faust.Table):
    ...

app = App(..., Table=MyBaseTable)

Example using the string path to a class:

app = App(..., Table='myproj.tables.Table')
property SetTable

SetTable extension table.

The SetTable class to use for table-of-set tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MySetTable(faust.SetTable):
    ...

app = App(..., Table=MySetTable)

Example using the string path to a class:

app = App(..., Table='myproj.tables.MySetTable')
property GlobalTable

GlobalTable class type.

The GlobalTable class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseGlobalTable(faust.GlobalTable):
    ...

app = App(..., GlobalTable=MyBaseGlobalTable)

Example using the string path to a class:

app = App(..., GlobalTable='myproj.tables.GlobalTable')
property SetGlobalTable

SetGlobalTable class type.

The SetGlobalTable class to use for tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

class MyBaseSetGlobalTable(faust.SetGlobalTable):
    ...

app = App(..., SetGlobalTable=MyBaseGlobalSetTable)

Example using the string path to a class:

app = App(..., SetGlobalTable='myproj.tables.SetGlobalTable')
property TableManager

Table manager class type.

The TableManager used for managing tables, or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.tables import TableManager

class MyTableManager(TableManager):
    ...

app = App(..., TableManager=MyTableManager)

Example using the string path to a class:

app = App(..., TableManager='myproj.tables.TableManager')
property Serializers

Serializer registry class type.

The Registry class used for serializing/deserializing messages; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.serialiers import Registry

class MyRegistry(Registry):
    ...

app = App(..., Serializers=MyRegistry)

Example using the string path to a class:

app = App(..., Serializers='myproj.serializers.Registry')
property Worker

Worker class type.

The Worker class used for starting a worker for this app; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust

class MyWorker(faust.Worker):
    ...

app = faust.App(..., Worker=Worker)

Example using the string path to a class:

app = faust.App(..., Worker='myproj.workers.Worker')
property PartitionAssignor

Partition assignor class type.

The PartitionAssignor class used for assigning topic partitions to worker instances; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.assignor import PartitionAssignor

class MyPartitionAssignor(PartitionAssignor):
    ...

app = App(..., PartitionAssignor=PartitionAssignor)

Example using the string path to a class:

app = App(..., Worker='myproj.assignor.PartitionAssignor')
property LeaderAssignor

Leader assignor class type.

The LeaderAssignor class used for assigning a master Faust instance for the app; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.assignor import LeaderAssignor

class MyLeaderAssignor(LeaderAssignor):
    ...

app = App(..., LeaderAssignor=LeaderAssignor)

Example using the string path to a class:

app = App(..., Worker='myproj.assignor.LeaderAssignor')
property Router

Router class type.

The Router class used for routing requests to a worker instance having the partition for a specific key (e.g. table key); or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

from faust.router import Router

class MyRouter(Router):
    ...

app = App(..., Router=Router)

Example using the string path to a class:

app = App(..., Router='myproj.routers.Router')
property Topic

Topic class type.

The Topic class used for defining new topics; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust

class MyTopic(faust.Topic):
    ...

app = faust.App(..., Topic=MyTopic)

Example using the string path to a class:

app = faust.App(..., Topic='myproj.topics.Topic')
property HttpClient

Http client class type

The aiohttp.client.ClientSession class used as a HTTP client; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust
from aiohttp.client import ClientSession

class HttpClient(ClientSession):
    ...

app = faust.App(..., HttpClient=HttpClient)

Example using the string path to a class:

app = faust.App(..., HttpClient='myproj.http.HttpClient')
property Monitor

Monitor sensor class type.

The Monitor class as the main sensor gathering statistics for the application; or the fully-qualified path to one (supported by symbol_by_name()).

Example using a class:

import faust
from faust.sensors import Monitor

class MyMonitor(Monitor):
    ...

app = faust.App(..., Monitor=MyMonitor)

Example using the string path to a class:

app = faust.App(..., Monitor='myproj.monitors.Monitor')
property stream_ack_cancelled_tasks

Deprecated setting has no effect.

property stream_ack_exceptions

Deprecated setting has no effect.

SETTINGS: ClassVar[SettingIndexMapping] = {'Agent': <faust.types.settings.params._Symbol object>, 'ConsumerScheduler': <faust.types.settings.params._Symbol object>, 'Event': <faust.types.settings.params._Symbol object>, 'GlobalTable': <faust.types.settings.params._Symbol object>, 'HttpClient': <faust.types.settings.params._Symbol object>, 'LeaderAssignor': <faust.types.settings.params._Symbol object>, 'Monitor': <faust.types.settings.params._Symbol object>, 'PartitionAssignor': <faust.types.settings.params._Symbol object>, 'Router': <faust.types.settings.params._Symbol object>, 'Schema': <faust.types.settings.params._Symbol object>, 'Serializers': <faust.types.settings.params._Symbol object>, 'SetGlobalTable': <faust.types.settings.params._Symbol object>, 'SetTable': <faust.types.settings.params._Symbol object>, 'Stream': <faust.types.settings.params._Symbol object>, 'Table': <faust.types.settings.params._Symbol object>, 'TableManager': <faust.types.settings.params._Symbol object>, 'Topic': <faust.types.settings.params._Symbol object>, 'Worker': <faust.types.settings.params._Symbol object>, 'aerospike_retries_on_exception': <faust.types.settings.params.Int object>, 'aerospike_sleep_seconds_between_retries_on_exception': <faust.types.settings.params.Int object>, 'agent_supervisor': <faust.types.settings.params._Symbol object>, 'autodiscover': <faust.types.settings.params.Param object>, 'blocking_timeout': <faust.types.settings.params.Seconds object>, 'broker': <faust.types.settings.params.BrokerList object>, 'broker_api_version': <faust.types.settings.params.Str object>, 'broker_check_crcs': <faust.types.settings.params.Bool object>, 'broker_client_id': <faust.types.settings.params.Str object>, 'broker_commit_every': <faust.types.settings.params.UnsignedInt object>, 'broker_commit_interval': <faust.types.settings.params.Seconds object>, 'broker_commit_livelock_soft_timeout': <faust.types.settings.params.Seconds object>, 'broker_consumer': <faust.types.settings.params.BrokerList object>, 'broker_credentials': <faust.types.settings.params.Credentials object>, 'broker_heartbeat_interval': <faust.types.settings.params.Seconds object>, 'broker_max_poll_interval': <faust.types.settings.params.Seconds object>, 'broker_max_poll_records': <faust.types.settings.params.UnsignedInt object>, 'broker_producer': <faust.types.settings.params.BrokerList object>, 'broker_rebalance_timeout': <faust.types.settings.params.Seconds object>, 'broker_request_timeout': <faust.types.settings.params.Seconds object>, 'broker_session_timeout': <faust.types.settings.params.Seconds object>, 'cache': <faust.types.settings.params.URL object>, 'canonical_url': <faust.types.settings.params.URL object>, 'consumer_api_version': <faust.types.settings.params.Str object>, 'consumer_auto_offset_reset': <faust.types.settings.params.Str object>, 'consumer_connections_max_idle_ms': <faust.types.settings.params.Int object>, 'consumer_group_instance_id': <faust.types.settings.params.Str object>, 'consumer_max_fetch_size': <faust.types.settings.params.UnsignedInt object>, 'consumer_metadata_max_age_ms': <faust.types.settings.params.Int object>, 'crash_app_on_aerospike_exception': <faust.types.settings.params.Bool object>, 'datadir': <faust.types.settings.params.Path object>, 'debug': <faust.types.settings.params.Bool object>, 'env_prefix': <faust.types.settings.params.Str object>, 'id_format': <faust.types.settings.params.Str object>, 'key_serializer': <faust.types.settings.params.Codec object>, 'logging_config': <faust.types.settings.params.Dict object>, 'loghandlers': <faust.types.settings.params.LogHandlers object>, 'origin': <faust.types.settings.params.Str object>, 'processing_guarantee': <faust.types.settings.params.Enum.<locals>.EnumParam object>, 'producer_acks': <faust.types.settings.params.Int object>, 'producer_api_version': <faust.types.settings.params.Str object>, 'producer_compression_type': <faust.types.settings.params.Str object>, 'producer_connections_max_idle_ms': <faust.types.settings.params.Int object>, 'producer_linger': <faust.types.settings.params.Seconds object>, 'producer_linger_ms': <faust.types.settings.params.UnsignedInt object>, 'producer_max_batch_size': <faust.types.settings.params.UnsignedInt object>, 'producer_max_request_size': <faust.types.settings.params.UnsignedInt object>, 'producer_metadata_max_age_ms': <faust.types.settings.params.Int object>, 'producer_partitioner': <faust.types.settings.params._Symbol object>, 'producer_request_timeout': <faust.types.settings.params.Seconds object>, 'producer_threaded': <faust.types.settings.params.Bool object>, 'recovery_consistency_check': <faust.types.settings.params.Bool object>, 'reply_create_topic': <faust.types.settings.params.Bool object>, 'reply_expires': <faust.types.settings.params.Seconds object>, 'reply_to': <faust.types.settings.params.Str object>, 'reply_to_prefix': <faust.types.settings.params.Str object>, 'ssl_context': <faust.types.settings.params.SSLContext object>, 'store': <faust.types.settings.params.URL object>, 'store_check_exists': <faust.types.settings.params.Bool object>, 'stream_ack_cancelled_tasks': <faust.types.settings.params.Bool object>, 'stream_ack_exceptions': <faust.types.settings.params.Bool object>, 'stream_buffer_maxsize': <faust.types.settings.params.UnsignedInt object>, 'stream_processing_timeout': <faust.types.settings.params.Seconds object>, 'stream_publish_on_commit': <faust.types.settings.params.Bool object>, 'stream_recovery_delay': <faust.types.settings.params.Seconds object>, 'stream_wait_empty': <faust.types.settings.params.Bool object>, 'table_cleanup_interval': <faust.types.settings.params.Seconds object>, 'table_key_index_size': <faust.types.settings.params.UnsignedInt object>, 'table_standby_replicas': <faust.types.settings.params.UnsignedInt object>, 'tabledir': <faust.types.settings.params.Path object>, 'timezone': <faust.types.settings.params.Timezone object>, 'topic_allow_declare': <faust.types.settings.params.Bool object>, 'topic_disable_leader': <faust.types.settings.params.Bool object>, 'topic_partitions': <faust.types.settings.params.UnsignedInt object>, 'topic_replication_factor': <faust.types.settings.params.UnsignedInt object>, 'url': <faust.types.settings.params.URL object>, 'value_serializer': <faust.types.settings.params.Codec object>, 'version': <faust.types.settings.params.Int object>, 'web': <faust.types.settings.params.URL object>, 'web_bind': <faust.types.settings.params.Str object>, 'web_cors_options': <faust.types.settings.params.Dict object>, 'web_enabled': <faust.types.settings.params.Bool object>, 'web_host': <faust.types.settings.params.Str object>, 'web_in_thread': <faust.types.settings.params.Bool object>, 'web_port': <faust.types.settings.params.Port object>, 'web_ssl_context': <faust.types.settings.params.SSLContext object>, 'web_transport': <faust.types.settings.params.URL object>, 'worker_redirect_stdouts': <faust.types.settings.params.Bool object>, 'worker_redirect_stdouts_level': <faust.types.settings.params.Severity object>}

Index of all settings by name.

SETTINGS_BY_SECTION: ClassVar[SettingSectionIndexMapping] = defaultdict(<class 'list'>, {<Section: SectionType.COMMON>: [<faust.types.settings.params.Param object>, <faust.types.settings.params.Path object>, <faust.types.settings.params.Path object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Timezone object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.BrokerList object>, <faust.types.settings.params.Credentials object>, <faust.types.settings.params.SSLContext object>, <faust.types.settings.params.Dict object>, <faust.types.settings.params.LogHandlers object>, <faust.types.settings.params.Enum.<locals>.EnumParam object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>], <Section: SectionType.AGENT>: [<faust.types.settings.params._Symbol object>], <Section: SectionType.BROKER>: [<faust.types.settings.params.BrokerList object>, <faust.types.settings.params.BrokerList object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Seconds object>], <Section: SectionType.CONSUMER>: [<faust.types.settings.params.Str object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params._Symbol object>], <Section: SectionType.SERIALIZATION>: [<faust.types.settings.params.Codec object>, <faust.types.settings.params.Codec object>], <Section: SectionType.PRODUCER>: [<faust.types.settings.params.Int object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.STREAM>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.Int object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>], <Section: SectionType.RPC>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Seconds object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Str object>], <Section: SectionType.TABLE>: [<faust.types.settings.params.Seconds object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.TOPIC>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.UnsignedInt object>, <faust.types.settings.params.UnsignedInt object>], <Section: SectionType.WEB_SERVER>: [<faust.types.settings.params.URL object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Dict object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Str object>, <faust.types.settings.params.Bool object>, <faust.types.settings.params.Port object>, <faust.types.settings.params.SSLContext object>, <faust.types.settings.params.URL object>, <faust.types.settings.params.URL object>], <Section: SectionType.WORKER>: [<faust.types.settings.params.Bool object>, <faust.types.settings.params.Severity object>], <Section: SectionType.EXTENSION>: [<faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>, <faust.types.settings.params._Symbol object>]})

Index of all sections and the settings in a section.

property producer_linger_ms

Deprecated setting, please use producer_linger instead.

This used to be provided as milliseconds, the new setting uses seconds.

property url

Backward compatibility alias to broker.