The App - Define your Faust project

What is an Application?

An application is an instance of the library, and provides the core API of Faust.

The application can define stream processors (agents), topics, channels, web views, CLI commands and more.

To create one you need to provide a name for the application (the id), a message broker, and a driver to use for table storage (optional)

>>> import faust
>>> app = faust.App('example', broker='kafka://', store='rocksdb://')

Application Parameters

You must provide a name for the app, and also you will want to set the broker and store options to configure the broker URL and a storage driver.

Other than that the rest have sensible defaults so you can safely use Faust without changing them.

Here we set the broker URL to Kafka, and the storage driver to RocksDB:

>>> app = faust.App(
...     'myid',
...     broker='kafka://',
...     store='rocksdb://',
... )

kafka://localhost is used if you don’t configure a broker URL. The first part of the URL (kafka://), is called the scheme and specifies the driver that you want to use (it can also be the fully qualified path to a Python class).

The storage driver decides how to keep distributed tables locally, and Faust version 1.0 supports two options:


RocksDB an embedded database (production)


In-memory (development)

Using the memory:// store is OK when developing your project and testing things out, but for large tables, it can take hours to recover after a restart, so you should never use it in production.

RocksDB recovers tables in seconds or less, is embedded and don’t require a server or additional infrastructure. It also stores table data on the file system in such a way that tables can exceed the size of available memory.

See also

Configuration Reference: for a full list of supported configuration

settings – these can be passed as keyword arguments when creating the faust.App.

App Instantation and Configuration

Instantiating and setting the configurations of a Faust app can be done in two steps, much like a Flask app. One can create a new App instance with the entire configuration as parameters and then run it, or instantiate the App, set the parameters and then run the application. This allows for easier configuration switching (in particular during development and testing phases).

>>> app = faust.App('myApp')
>>> app.main()


app.topic() – Create a topic-description

Use the topic() method to create a topic description, used to tell stream processors what Kafka topic to read from, and how the keys and values in that topic are serialized:

topic = app.topic('name_of_topic')

async def process(stream):
    async for event in stream:

Topic Arguments

  • key_type/value_type: ModelArg

    Use the key_type and value_type arguments to specify the models used for key and value serialization:

    class MyValueModel(faust.Record):
        name: str
        value: float
    topic = app.topic(

    The default key_type is bytes and treats the key as a binary string. The key can also be specified as a model type (key_type=MyKeyModel).

    See also

  • key_serializer/value_serializer: CodecArg

    The codec/serializer type used for keys and values in this topic.

    If not specified the default will be taken from the key_serializer and value_serializer settings.

    See also

  • partitions: int

    The number of partitions this topic should have. If not specified the default in the topic_partitions setting is used.

    Note: if this is an automatically created topic, or an externally managed source topic, then please set this value to None.

  • retention: Seconds

    Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server.

  • compacting: bool

    Set to True if this should be a compacting topic. The Kafka broker will then periodically compact the topic, only keeping the most recent value for a key.

  • acks: bool

    Enable automatic acknowledgment for this topic. If you disable this then you are responsible for manually acknowledging each event.

  • internal: bool

    If set to True this means we own and are responsible for this topic: we are allowed to create or delete the topic.

  • maxsize: int

    The maximum buffer size used for this channel, with default taken from the stream_buffer_maxsize setting. When this buffer is exceeded the worker will have to wait for agent/stream consumers to catch up, and if the buffer is frequently full this will negatively affect performance.

    Try tweaking the buffer sizes, but also the broker_commit_interval setting to make sure it commits more frequently with larger buffer sizes. – Create a local channel

Use channel() to create an in-memory communication channel:

import faust

app = faust.App('channel')

class MyModel(faust.Record):
    x: int

channel =

async def process(stream):
    async for event in stream:
        print(f'Received: {event!r}')

async def populate():
    await channel.send(value=MyModel(303))

See also

Channel Arguments

  • key_type/value_type: ModelArg

    Use the key_type and value_type arguments to specify the models used for key and value serialization:

    class MyValueModel(faust.Record):
        name: str
        value: float
    channel =, value_type=MyValueModel)
  • key_serializer/value_serializer: CodecArg

    The codec/serializer type used for keys and values in this channel.

    If not specified the default will be taken from the key_serializer and value_serializer settings.

  • maxsize: int

    This is the maximum number of pending messages in the channel. If this number is exceeded any call to channel.put(value) will block until something consumes another message from the channel.

    Defaults to the stream_buffer_maxsize setting.

app.Table() – Define a new table

Use Table() to define a new distributed dictionary; the only required argument is a unique and identifying name. Here we also set a default value so the table acts as a defaultdict:

transfer_counts = app.Table(

The default argument is passed in as a callable, and in our example calling int() returns the number zero, so whenever a key is missing in the table, it’s initialized with a value of zero:

>>> table['missing']

>>> table['also-missing'] += 1
>>> table['also-missing']

The table needs to relate every update to an associated source topic event, so you must be iterating over a stream to modify a table. Like in this agent where also .group_by() is used to repartition the stream by account id, ensuring every unique account delivers to the same agent instance, and that the count-per-account is recorded accurately:

async def transfer(transfers):
    async for transfer in transfers.group_by(Transfer.account):
        transfer_counts[transfer.account] += 1

The agent modifying the table cannot process the source topic out of order, so only agents with concurrency=1 are allowed to update tables.

See also

  • The Tables and Windowing guide – for more information about tables.

    Learn how to create a “windowed table” where aggregate values are placed into configurable time windows, providing you with answers to questions like “what was the value in the last five minutes”, or “what was the value of this count like yesterday”.

Table Arguments

  • name: str

    The name of the table. This must be unique as two tables with the same in the same application will share changelog topics.

  • help: str

    Brief description of table purpose.

  • default: Callable[[], Any]

    User provided function called to get default value for missing keys.

    Without any default this attempt to access a missing key will raise KeyError:

    >>> table = app.Table('nodefault', default=None)
    >>> table['missing']
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    KeyError: 'missing'

    With the default callback set to int, the same missing key will now set the key to 0 and return 0:

    >>> table = app.Table('hasdefault', default=int)
    >>> table['missing']
  • key_type/value_type: ModelArg

    Use the key_type and value_type arguments to specify the models used for serializing/deserializing keys and values in this table.

    class MyValueModel(faust.Record):
        name: str
        value: float
    table = app.Table(key_type=bytes, value_type=MyValueModel)
  • store: str or URL

    The name of a storage backend to use, or the URL to one.

    Default is taken from the store setting.

  • partitions: int

    The number of partitions for the changelog topic used by this table.

    Default is taken from the topic_partitions setting.

  • changelog_topic: Topic

    The changelog topic description to use for this table.

    Only for advanced users who know what they’re doing.

  • recovery_buffer_size: int

    How often we flush changelog records during recovery. Default is every 1000 changelog messages.

  • standby_buffer_size: int

    How often we flush changelog records during recovery. Default is None (always).

  • on_changelog_event: Callable[[EventT], Awaitable[None]]

    A callback called for every changelog event during recovery and while keeping table standbys in sync.

@app.agent() – Define a new stream processor

Use the agent() decorator to define an asynchronous stream processor:

# examples/
import faust

app = faust.App('stream-example')

async def myagent(stream):
    """Example agent."""
    async for value in stream:
        print(f'MYAGENT RECEIVED -- {value!r}')
        yield value

if __name__ == '__main__':


  • “agent” – A named group of actors processing a stream.

  • “actor” – An individual agent instance.

No topic was passed to the agent decorator, so an anonymous topic will be created for it. Use the faust agents program to list the topics used by each agent:

$ python examples/ agents
│ name     │ topic                                 │ help           │
│ @myagent │ stream-example-examples.agent.myagent │ Example agent. │

The autogenerated topic name stream-example-examples.agent.myagent is generated from the application id setting, the application version setting, and the fully qualified path of the agent (examples.agent.myagent).

Start a worker to consume from the topic:

$ python examples/ worker -l info

Next, in a new console, send the agent a value using the faust send program. The first argument to send is the name of the topic, and the second argument is the value to send (use --key=k to specify key). The name of the topic can also start with the @ character to name an agent instead.

Use @agent to send a value of "hello" to the topic of our agent:

$ python examples/ send @myagent hello

Finally, you should see in the worker console that it received our message:


See also

Agent Arguments

  • name: str

    The name of the agent is automatically taken from the decorated function and the module it is defined in.

    You can also specify the name manually, but note that this should include the module name, e.g.: name='proj.agents.add'.

  • channel: Channel

    The channel or topic this agent should consume from.

  • concurrency: int

    The number of concurrent actors to start for this agent on every worker instance.

    For example if you have an agent processing RSS feeds, a concurrency of 100 means you can process up to hundred RSS feeds at the same time on every worker instance that you start.

    Adding concurrency to your agent also means it will process events in the topic out of order, and should you rewind the stream that order may differ when processing the events a second time.

    Concurrency and tables

    Concurrent agents are not allowed to modify tables: an exception is raised if this is attempted.

    They are, however, allowed to read from tables.

  • sink: Iterable[SinkT]

    For agents that also yield a value: forward the value to be processed by one or more “sinks”.

    A sink can be another agent, a topic, or a callback (async or non-async).

    See also

    Sinks – for more information on using sinks.

  • on_error: Callable[[Agent, BaseException], None]

    Optional error callback to be called when this agent raises an unexpected exception.

  • supervisor_strategy: mode.SupervisorStrategyT

    A supervisor strategy to decide what happens when the agent raises an exception.

    The default supervisor strategy is mode.OneForOneSupervisor – restarting one and one actor as they crash.

    Other built-in supervisor strategies include:

    • mode.OneForAllSupervisor

      If one agent instance of this type raises an exception we will restart all other agent instances of this type.

    • mode.CrashingSupervisor

      If one agent instance of this type raises an exception we will crash the worker instance.

  • **kwargs

    If the channel argument is not specified the agent will use an automatically named topic.

    Any additional keyword arguments are considered to be configuration for this topic, with support for the same arguments as app.topic().

@app.task() – Define a new support task.

Use the task() decorator to define an asynchronous task to be started with the app:

async def mytask():

The task will be started when the app starts, by scheduling it as an asyncio.Task on the event loop. It will only be started once the app is fully operational, meaning it has started consuming messages from Kafka.

See also

@app.timer() – Define a new periodic task

Use the timer() decorator to define an asynchronous periodic task that runs every 30.0 seconds:

async def my_periodic_task():

The timer will start 30 seconds after the worker instance has started and is in an operational state.

See also

Timer Arguments

  • on_leader: bool

    If enabled this timer will only execute on one of the worker instances at a time – that is only on the leader of the cluster.

    This can be used as a distributed mutex to execute something on one machine at a time. – Define a new Web View

Use the page() decorator to define a new web view from an async function:

# examples/
import faust

app = faust.App('view-example')'/path/to/view/')
async def myview(web, request):
    print(f'FOO PARAM: {request.query["foo"]}')

if __name__ == '__main__':

Next run a worker instance to start the web server on port 6066 (default):

$ python examples/ worker -l info

Then visit your view in the browser by going to http://localhost:6066/path/to/view/:

$ open http://localhost:6066/path/to/view/

See also

app.main() – Start the faust command-line program.

To have your script extend the faust program, you can call app.main():

# examples/
import faust

app = faust.App('umbrella-command-example')

if __name__ == '__main__':

This will use the arguments in sys.argv and will support the same arguments as the faust umbrella command.

To see a list of available commands, execute your program:

$ python examples/

To get help for a particular subcommand run:

$ python examples/ worker --help

See also

  • The main() method in the API reference.

@app.command() – Define a new command-line command

Use the command() decorator to define a new subcommand for the faust command-line program:

# examples/
import faust

app = faust.App('example-subcommand')

async def example():
    """This docstring is used as the command help in --help."""

if __name__ == '__main__':

You can now run your subcommand:

$ python examples/ example

See also

@app.service() – Define a new service

The service() decorator adds a custom mode.Service class as a dependency of the app.

You can decorate a service class to have it start with the app:

# examples/
import faust
from mode import Service

app = faust.App('service-example')

class MyService(Service):

    async def on_start(self):
        print('MYSERVICE IS STARTING')

    async def on_stop(self):
        print('MYSERVICE IS STOPPING')

    async def _background_task(self):
        while not self.should_stop:
            print('BACKGROUND TASK WAKE UP')
            await self.sleep(1.0)

if __name__ == '__main__':

To start the app and see it and action run a worker:

python examples/ worker -l info

You can also add services at runtime in application subclasses:

class MyApp(App):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.some_service = self.service(SomeService())

Application Signals

You may have experience with signals in other frameworks such as Django and Celery.

The main difference between signals in Faust is that they accept positional arguments, and that they also come with asynchronous versions for use with asyncio.

Signals are an implementation of the Observer design pattern.


New in version 1.6.




key, value, partition, timestamp, headers


This is a synchronous signal (do not use async def).

The on_produce_message signal as a synchronous signal called before producing messages.

This can be used to attach custom headers to Kafka messages:

from typing import Any, List, Tuple
from faust.types import AppT
from mode.utils.compat import want_bytes

def on_produce_attach_trace_headers(
        sender: AppT,
        key: bytes = None,
        value: bytes = None,
        partition: Optional[int] = None,
        timestamp: Optional[float] = None,
        headers: List[Tuple[str, bytes]] = None,
        **kwargs: Any) -> None:
    test = current_test()
    if test is not None:
        # Headers at this point is a list of ``(key, value)`` pairs.
        # Note2: values in headers must be :class:`bytes`.
            (k, want_bytes(v)) for k, v in test.as_headers().items()






The on_partitions_revoked signal is an asynchronous signal called after every Kafka rebalance and provides a single argument which is the set of newly revoked partitions.

Add a callback to be called when partitions are revoked:

from typing import Set
from faust.types import AppT, TP

async def on_partitions_revoked(app: AppT,
                                revoked: Set[TP], **kwargs) -> None:
    print(f'Partitions are being revoked: {revoked}')

Using app as an instance when connecting here means we will only be called for that particular app instance. If you want to be called for all app instances then you must connect to the signal of the class (App):

async def on_partitions_revoked(app: AppT,
                                revoked: Set[TP], **kwargs) -> None:

Signal handlers must always accept **kwargs.

Signal handler must always accept **kwargs so that they are backwards compatible when new arguments are added.

Similarly new arguments must be added as keyword arguments to be backwards compatible.






The on_partitions_assigned signal is an asynchronous signal called after every Kafka rebalance and provides a single argument which is the set of assigned partitions.

Add a callback to be called when partitions are assigned:

from typing import Set
from faust.types import AppT, TP

async def on_partitions_assigned(app: AppT,
                                 assigned: Set[TP], **kwargs) -> None:
    print(f'Partitions are being assigned: {assigned}')







This is a synchronous signal (do not use async def).

Called as the app reads configuration, just before the application configuration is set, but after the configuration is read.

Takes arguments: (app, conf), where conf is the faust.Settings object being built and is the instance that app.conf will be set to after this signal returns.

Use the on_configured signal to configure your app:

import os
import faust

app = faust.App('myapp')

def configure(app, conf, **kwargs): = os.environ.get('FAUST_BROKER') = os.environ.get('STORE_URL')







This is a synchronous signal (do not use async def).

Called before the app reads configuration, and before the App.on_configured signal is dispatched.

Takes only sender as argument, which is the app being configured:

def before_configuration(app, **kwargs):
    print(f'App {app} is being configured')







This is a synchronous signal (do not use async def).

Called after app is fully configured and ready for use.

Takes only sender as argument, which is the app that was configured:

def after_configuration(app, **kwargs):
    print(f'App {app} has been configured.')







This is a synchronous signal (do not use async def).

Called by the faust worker program (or when using app.main()) to apply worker specific customizations.

Takes only sender as argument, which is the app a worker is being started for:

def on_worker_init(app, **kwargs):
    print(f'Working starting for app {app}')

Starting the App

You can start a worker instance for your app from the command-line, or you can start it inline in your Python process. To accommodate the many ways you may want to embed a Faust application, starting the app have several possible entry points:

App entry points:

  1. faust worker

    The faust worker program starts a worker instance for an app from the command-line.

    You may turn any self-contained module into the faust program by adding this to the end of the file:

    if __name__ == '__main__':

    For packages you can add a module or setuptools entry points to

    If you have the module name where an app is defined, you can start a worker for it with the faust -A option:

    $ faust -A myproj worker -l info

    The above will import the app from the myproj module using from myproj import app. If you need to specify a different attribute you can use a fully qualified path:

    $ faust -A myproj:faust_app worker -l info
  2. -> faust.cli.worker.worker (CLI interface)

    This is the faust worker program defined as a Python click command.

    It is responsible for:

    • Parsing the command-line arguments supported by faust worker.

    • Printing the banner box (you will not get that with entry point 3 or 4).

    • Starting the faust.Worker (see next step).

  3. -> faust.Worker

    This is used for starting a worker from Python when you also want to install process signal handlers, etc. It supports the same options as on the faust worker command-line, but now they are passed in as keyword arguments to faust.Worker.

    The Faust worker is a subclass of mode.Worker, which makes sense given that Faust is built out of many different mode services starting in a particular order.

    The faust.Worker entry point is responsible for:

    • Changing the directory when the workdir argument is set.

    • Setting the process title (when setproctitle is installed), for more helpful entry in ps listings.

    • Setting up logging: handlers, formatters and level.

    • If --debug is enabled:

      • Starting the aiomonitor debugging back door.

      • Starting the blocking detector.

    • Setting up TERM and INT signal handlers.

    • Setting up the USR1 cry handler that logs a traceback.

    • Starting the web server.

    • Autodiscovery (see autodiscovery).

    • Starting the faust.App (see next step).

    • Properly shut down of the event loop on exit.

    To start a worker,

    1. from synchronous code, use Worker.execute_from_commandline:

      >>> worker = Worker(app)
      >>> worker.execute_from_commandline()
    2. or from an async def function call await worker.start():


      You will be responsible for gracefully shutting down the event loop.

      async def start_worker(worker: Worker) -> None:
          await worker.start()
      def manage_loop():
          loop = asyncio.get_event_loop()
          worker = Worker(app, loop=loop)

    Multiple apps

    If you want your worker to start multiple apps, you would have to pass them in with the *services starargs:

    worker = Worker(app1, app2, app3, app4)

    This way the extra apps will be started together with the main app, and the main app of the worker ( will end up being the first positional argument (app1).

    The problem with starting multiple apps is that each app will start a web server by default.

    If you want a web server for every app, you must configure the web port for each:

    apps = [app1, app2, app3, app4]
    for i, app in enumerate(apps):
        app.conf.web_port = 6066 + i
    worker = Worker(*apps)
  4. -> faust.App

    The “worker” only concerns itself with the terminal, process signal handlers, logging, debugging mechanisms, etc., the rest is up to the app.

    You can call await app.start() directly to get a side-effect free instance that can be embedded in any environment. It won’t even emit logs to the console unless you have configured logging manually, and it won’t set up any TERM/INT signal handlers, which means finally blocks won’t execute at shutdown.

    Start app directly:

    async def start_app(app):
        await app.start()

    This will block until the worker shuts down, so if you want to start other parts of your program, you can start this in the background:

    def start_in_loop(app):
        loop = asyncio.get_event_loop()

    If your program is written as a set of Mode services, you can simply add the app as a dependency to your service:

    class MyService(mode.Service):
        def on_init_dependencies(self):
            return [faust_app]

Client-Only Mode

The app can also be started in “client-only” mode, which means the app can be used for sending agent RPC requests and retrieving replies, but not start a full Faust worker:

await app.start_client()

Projects and Directory Layout

Faust is a library; it does not mandate any specific directory layout and integrates with any existing framework or project conventions.

That said, new projects written from scratch using Faust will want some guidance on how to organize, so we include this as a suggestion in the documentation.

Small/Standalone Projects

You can create a small Faust service with no supporting directories at all, we refer to this as a “standalone module”: a module that contains everything it needs to run a full service.

The Faust distribution comes with several standalone examples, such as examples/

Medium/Large Projects

Projects need more organization as they grow larger, so we convert the standalone module into a directory layout:

+ proj/
    - README.rst
    - setup.cfg

    + proj/

        + users/

        + orders/

Problem: Autodiscovery

Now we have many @app.agent/@app.timer’/@app.command decorators, and models spread across a nested directory. These have to be imported by the program to be registered and used.

Enter the autodiscover setting:

# proj/
import faust

app = faust.App(
    origin='proj'   # imported name for this project (import proj -> "proj")

def main() -> None:

Using the autodiscover and setting it to True means it will traverse the directory of the origin module to find agents, timers, tasks, commands and web views, etc.

If you want more careful control you can specify a list of modules to traverse instead:

app = faust.App(
    autodiscover=['proj.users', 'proj.orders'],

Autodiscovery when using Django

When using autodiscover=True in a Django project, only the apps listed in INSTALLED_APPS will be traversed.

See also Django Projects.

Problem: Entry Point

The proj/ module can act as the entry point for this project:

# proj/
from import app

After creating this module you can now start a worker by doing:

python -m proj worker -l info

Now you’re probably thinking, “I’m too lazy to type python dash em all the time”, but don’t worry: take it one step further by using setuptools to install a command-line program for your project.

  1. Create a for your project.

    This step is not needed if you already have one.

    You can read lots about creating your in the setuptools documentation here:

    A minimum example that will work well enough:

    #!/usr/bin/env python
    from setuptools import find_packages, setup
        description='Use Faust to blah blah blah',
        author='Ola Normann',
        packages=find_packages(exclude=['tests', 'tests.*']),

    For inspiration you can also look to the files in the faust and mode source code distributions.

  2. Add the command as a setuptools entry point.

    To your add the following argument:

            'console_scripts': [
                'proj =',

    This essentially defines that the proj program runs from import main

  3. Install your package using or pip.

    When developing your project locally you should use develop to use the source code directory as a Python package:

    $ python develop

    You can now run the proj command you added to in step two:

    $ proj worker -l info

    Why use develop? You can use python install, but then you have to run that every time you make modifications to the source files.

Another upside to using is that you can distribute your projects as pip install-able packages.

Django Projects

Django has their own conventions for directory layout, but your Django reusable apps will want some way to import your Faust app.

We believe the best place to define the Faust app in a Django project, is in a dedicated reusable app. See the faustapp app in the examples/django directory in the Faust source code distribution.


Why use applications?

For special needs, you can inherit from the faust.App class, and a subclass will have the ability to change how almost everything works.

Comparing the application to the interface of frameworks like Django, there are clear benefits.

In Django, the global settings module means having multiple configurations are impossible, and with an API organized by modules, you sometimes end up with lots of import statements and keeping track of many modules. Further, you often end up monkey patching to change how something works.

The application keeps the library flexible to changes, and allows for many applications to coexist in the same process space.


See faust.App in the API reference for a full list of methods and attributes supported.