The App - Define your Faust project¶
What is an Application?¶
An application is an instance of the library, and provides the core API of Faust.
The application can define stream processors (agents), topics, channels, web views, CLI commands and more.
To create one you need to provide a name for the application (the id), a message broker, and a driver to use for table storage (optional)
>>> import faust
>>> app = faust.App('example', broker='kafka://', store='rocksdb://')
Application Parameters¶
You must provide a name for the app, and also you will want to
set the broker
and store
options to configure the broker URL and
a storage driver.
Other than that the rest have sensible defaults so you can safely use Faust without changing them.
Here we set the broker URL to Kafka, and the storage driver to RocksDB:
>>> app = faust.App(
... 'myid',
... broker='kafka://kafka.example.com',
... store='rocksdb://',
... )
kafka://localhost
is used if you don’t configure a broker URL.
The first part of the URL (kafka://
), is called the scheme and specifies
the driver that you want to use (it can also be the fully qualified
path to a Python class).
The storage driver decides how to keep distributed tables locally, and Faust version 1.0 supports two options:
|
RocksDB an embedded database (production) |
|
In-memory (development) |
Using the memory://
store is OK when developing your project and testing
things out, but for large tables, it can take hours to recover after
a restart, so you should never use it in production.
RocksDB recovers tables in seconds or less, is embedded and don’t require a server or additional infrastructure. It also stores table data on the file system in such a way that tables can exceed the size of available memory.
See also
- Configuration Reference: for a full list of supported configuration
settings – these can be passed as keyword arguments when creating the
faust.App
.
App Instantation and Configuration¶
Instantiating and setting the configurations of a Faust app can be done in two steps, much like a Flask app. One can create a new App instance with the entire configuration as parameters and then run it, or instantiate the App, set the parameters and then run the application. This allows for easier configuration switching (in particular during development and testing phases).
>>> app = faust.App('myApp')
>>> app.conf.broker='kafka://kafka.example.com'
>>> app.conf.store='rocksdb://'
>>> app.main()
Actions¶
app.topic()
– Create a topic-description¶
Use the topic()
method to create a topic description, used
to tell stream processors what Kafka topic to read from, and how the keys
and values in that topic are serialized:
topic = app.topic('name_of_topic')
@app.agent(topic)
async def process(stream):
async for event in stream:
...
Topic Arguments¶
key_type
/value_type
:ModelArg
Use the
key_type
andvalue_type
arguments to specify the models used for key and value serialization:class MyValueModel(faust.Record): name: str value: float topic = app.topic( 'name_of_topic', key_type=bytes, value_type=MyValueModel, )
The default
key_type
isbytes
and treats the key as a binary string. The key can also be specified as a model type (key_type=MyKeyModel
).See also
The Channels & Topics - Data Sources guide – for more about topics and channels.
The Models, Serialization, and Codecs guide – for more about models and serialization.
key_serializer
/value_serializer
:CodecArg
The codec/serializer type used for keys and values in this topic.
If not specified the default will be taken from the
key_serializer
andvalue_serializer
settings.See also
The Codecs section in the Models, Serialization, and Codecs guide – for more information on available codecs, and also how to make your own custom encoders and decoders.
partitions
:int
The number of partitions this topic should have. If not specified the default in the
topic_partitions
setting is used.Note: if this is an automatically created topic, or an externally managed source topic, then please set this value to
None
.retention
:Seconds
compacting
:bool
Set to
True
if this should be a compacting topic. The Kafka broker will then periodically compact the topic, only keeping the most recent value for a key.acks
:bool
Enable automatic acknowledgment for this topic. If you disable this then you are responsible for manually acknowledging each event.
internal
:bool
If set to
True
this means we own and are responsible for this topic: we are allowed to create or delete the topic.maxsize
:int
The maximum buffer size used for this channel, with default taken from the
stream_buffer_maxsize
setting. When this buffer is exceeded the worker will have to wait for agent/stream consumers to catch up, and if the buffer is frequently full this will negatively affect performance.Try tweaking the buffer sizes, but also the
broker_commit_interval
setting to make sure it commits more frequently with larger buffer sizes.
app.channel()
– Create a local channel¶
Use channel()
to create an in-memory communication channel:
import faust
app = faust.App('channel')
class MyModel(faust.Record):
x: int
channel = app.channel(value_type=MyModel)
@app.agent(channel)
async def process(stream):
async for event in stream:
print(f'Received: {event!r}')
@app.timer(1.0)
async def populate():
await channel.send(value=MyModel(303))
See also
The Channels & Topics - Data Sources guide – for more about topics and channels.
The Models, Serialization, and Codecs guide – for more about models and serialization.
Channel Arguments¶
key_type
/value_type
:ModelArg
Use the
key_type
andvalue_type
arguments to specify the models used for key and value serialization:class MyValueModel(faust.Record): name: str value: float channel = app.channel(key_type=bytes, value_type=MyValueModel)
key_serializer
/value_serializer
:CodecArg
The codec/serializer type used for keys and values in this channel.
If not specified the default will be taken from the
key_serializer
andvalue_serializer
settings.maxsize
:int
This is the maximum number of pending messages in the channel. If this number is exceeded any call to channel.put(value) will block until something consumes another message from the channel.
Defaults to the
stream_buffer_maxsize
setting.
app.Table()
– Define a new table¶
Use Table()
to define a new distributed dictionary; the only
required argument is a unique and identifying name. Here we also
set a default value so the table acts as a defaultdict
:
transfer_counts = app.Table(
'transfer_counts',
default=int,
key_type=str,
value_type=int,
)
The default argument is passed in as a callable, and in our example
calling int()
returns the number zero, so whenever a key is missing
in the table, it’s initialized with a value of zero:
>>> table['missing']
0
>>> table['also-missing'] += 1
>>> table['also-missing']
1
The table needs to relate every update to an associated source topic event,
so you must be iterating over a stream to modify a table. Like in this agent
where also .group_by()
is used to repartition the stream by account id,
ensuring every unique account delivers to the same agent instance,
and that the count-per-account is recorded accurately:
@app.agent(transfers_topic)
async def transfer(transfers):
async for transfer in transfers.group_by(Transfer.account):
transfer_counts[transfer.account] += 1
The agent modifying the table cannot process the source topic out of
order, so only agents with concurrency=1
are allowed to update tables.
See also
The Tables and Windowing guide – for more information about tables.
Learn how to create a “windowed table” where aggregate values are placed into configurable time windows, providing you with answers to questions like “what was the value in the last five minutes”, or “what was the value of this count like yesterday”.
Table Arguments¶
name
:str
The name of the table. This must be unique as two tables with the same in the same application will share changelog topics.
help
:str
Brief description of table purpose.
default
:Callable[[], Any]
User provided function called to get default value for missing keys.
Without any default this attempt to access a missing key will raise
KeyError
:>>> table = app.Table('nodefault', default=None) >>> table['missing'] Traceback (most recent call last): File "<stdin>", line 1, in <module> KeyError: 'missing'
With the default callback set to
int
, the same missing key will now set the key to0
and return0
:>>> table = app.Table('hasdefault', default=int) >>> table['missing'] 0
key_type
/value_type
:ModelArg
Use the
key_type
andvalue_type
arguments to specify the models used for serializing/deserializing keys and values in this table.class MyValueModel(faust.Record): name: str value: float table = app.Table(key_type=bytes, value_type=MyValueModel)
-
The name of a storage backend to use, or the URL to one.
Default is taken from the
store
setting. partitions
:int
The number of partitions for the changelog topic used by this table.
Default is taken from the
topic_partitions
setting.changelog_topic
:Topic
The changelog topic description to use for this table.
Only for advanced users who know what they’re doing.
recovery_buffer_size
:int
How often we flush changelog records during recovery. Default is every 1000 changelog messages.
standby_buffer_size
:int
How often we flush changelog records during recovery. Default is
None
(always).on_changelog_event
:Callable[[EventT], Awaitable[None]]
A callback called for every changelog event during recovery and while keeping table standbys in sync.
@app.agent()
– Define a new stream processor¶
Use the agent()
decorator to define an asynchronous stream processor:
# examples/agent.py
import faust
app = faust.App('stream-example')
@app.agent()
async def myagent(stream):
"""Example agent."""
async for value in stream:
print(f'MYAGENT RECEIVED -- {value!r}')
yield value
if __name__ == '__main__':
app.main()
Terminology
“agent” – A named group of actors processing a stream.
“actor” – An individual agent instance.
No topic was passed to the agent decorator, so an anonymous topic will be created for it. Use the faust agents program to list the topics used by each agent:
$ python examples/agent.py agents
┌Agents────┬───────────────────────────────────────┬────────────────┐
│ name │ topic │ help │
├──────────┼───────────────────────────────────────┼────────────────┤
│ @myagent │ stream-example-examples.agent.myagent │ Example agent. │
└──────────┴───────────────────────────────────────┴────────────────┘
The autogenerated topic name stream-example-examples.agent.myagent
is generated from the application id
setting, the
application version
setting, and the fully qualified path of the
agent (examples.agent.myagent
).
Start a worker to consume from the topic:
$ python examples/agent.py worker -l info
Next, in a new console, send the agent a value using the faust send
program. The first argument to send is the name of the topic, and the second
argument is the value to send (use --key=k
to specify key). The name of
the topic can also start with the @
character to name an agent instead.
Use @agent
to send a value of "hello"
to the topic of our agent:
$ python examples/agent.py send @myagent hello
Finally, you should see in the worker console that it received our message:
MYAGENT RECEIVED -- b'hello'
See also
The Agents - Self-organizing Stream Processors guide – for more information about agents.
The Channels & Topics - Data Sources guide – for more information about channels and topics.
Agent Arguments¶
name
:str
The name of the agent is automatically taken from the decorated function and the module it is defined in.
You can also specify the name manually, but note that this should include the module name, e.g.:
name='proj.agents.add'
.channel
:Channel
The channel or topic this agent should consume from.
concurrency
:int
The number of concurrent actors to start for this agent on every worker instance.
For example if you have an agent processing RSS feeds, a concurrency of
100
means you can process up to hundred RSS feeds at the same time on every worker instance that you start.Adding concurrency to your agent also means it will process events in the topic out of order, and should you rewind the stream that order may differ when processing the events a second time.
Concurrency and tables
Concurrent agents are not allowed to modify tables: an exception is raised if this is attempted.
They are, however, allowed to read from tables.
sink
:Iterable[SinkT]
For agents that also yield a value: forward the value to be processed by one or more “sinks”.
A sink can be another agent, a topic, or a callback (async or non-async).
See also
Sinks – for more information on using sinks.
on_error
:Callable[[Agent, BaseException], None]
Optional error callback to be called when this agent raises an unexpected exception.
supervisor_strategy
:mode.SupervisorStrategyT
A supervisor strategy to decide what happens when the agent raises an exception.
The default supervisor strategy is
mode.OneForOneSupervisor
– restarting one and one actor as they crash.Other built-in supervisor strategies include:
-
If one agent instance of this type raises an exception we will restart all other agent instances of this type.
-
If one agent instance of this type raises an exception we will crash the worker instance.
-
**kwargs
If the
channel
argument is not specified the agent will use an automatically named topic.Any additional keyword arguments are considered to be configuration for this topic, with support for the same arguments as
app.topic()
.
@app.task()
– Define a new support task.¶
Use the task()
decorator to define an asynchronous task to be started
with the app:
@app.task()
async def mytask():
print('APP STARTED AND OPERATIONAL')
The task will be started when the app starts, by scheduling it as an
asyncio.Task
on the event loop. It will only be started once the app
is fully operational, meaning it has started consuming messages from Kafka.
See also
The Tasks section in the Tasks, Timers, Cron Jobs, Web Views, and CLI Commands – for more information about defining tasks.
@app.timer()
– Define a new periodic task¶
Use the timer()
decorator to define an asynchronous periodic task
that runs every 30.0 seconds:
@app.timer(30.0)
async def my_periodic_task():
print('THIRTY SECONDS PASSED')
The timer will start 30 seconds after the worker instance has started and is in an operational state.
See also
The Timers section in the Tasks, Timers, Cron Jobs, Web Views, and CLI Commands guide – for more information about creating timers.
Timer Arguments¶
on_leader
:bool
If enabled this timer will only execute on one of the worker instances at a time – that is only on the leader of the cluster.
This can be used as a distributed mutex to execute something on one machine at a time.
@app.page()
– Define a new Web View¶
Use the page()
decorator to define a new web view from an
async function:
# examples/view.py
import faust
app = faust.App('view-example')
@app.page('/path/to/view/')
async def myview(web, request):
print(f'FOO PARAM: {request.query["foo"]}')
if __name__ == '__main__':
app.main()
Next run a worker instance to start the web server on port 6066 (default):
$ python examples/view.py worker -l info
Then visit your view in the browser by going to http://localhost:6066/path/to/view/:
$ open http://localhost:6066/path/to/view/
See also
The Web Views section in the Tasks, Timers, Cron Jobs, Web Views, and CLI Commands guide – to learn more about defining views.
app.main()
– Start the faust command-line program.¶
To have your script extend the faust program, you can call
app.main()
:
# examples/command.py
import faust
app = faust.App('umbrella-command-example')
if __name__ == '__main__':
app.main()
This will use the arguments in sys.argv
and will support the same
arguments as the faust umbrella command.
To see a list of available commands, execute your program:
$ python examples/command.py
To get help for a particular subcommand run:
$ python examples/command.py worker --help
See also
The
main()
method in the API reference.
@app.command()
– Define a new command-line command¶
Use the command()
decorator to define a new subcommand
for the faust command-line program:
# examples/command.py
import faust
app = faust.App('example-subcommand')
@app.command()
async def example():
"""This docstring is used as the command help in --help."""
print('RUNNING EXAMPLE COMMAND')
if __name__ == '__main__':
app.main()
You can now run your subcommand:
$ python examples/command.py example
RUNNING EXAMPLE COMMAND
See also
The CLI Commands section in the Tasks, Timers, Cron Jobs, Web Views, and CLI Commands guide – for more information about defining subcommands.
Including how to specify command-line arguments and parameters to your command.
@app.service()
– Define a new service¶
The service()
decorator adds a custom mode.Service
class
as a dependency of the app.
You can decorate a service class to have it start with the app:
# examples/service.py
import faust
from mode import Service
app = faust.App('service-example')
@app.service
class MyService(Service):
async def on_start(self):
print('MYSERVICE IS STARTING')
async def on_stop(self):
print('MYSERVICE IS STOPPING')
@Service.task
async def _background_task(self):
while not self.should_stop:
print('BACKGROUND TASK WAKE UP')
await self.sleep(1.0)
if __name__ == '__main__':
app.main()
To start the app and see it and action run a worker:
python examples/service.py worker -l info
You can also add services at runtime in application subclasses:
class MyApp(App):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.some_service = self.service(SomeService())
Application Signals¶
You may have experience with signals in other frameworks such as Django and Celery.
The main difference between signals in Faust is that they accept
positional arguments, and that they also come with asynchronous versions
for use with asyncio
.
Signals are an implementation of the Observer design pattern.
App.on_produce_message
¶
New in version 1.6.
- sender:
- arguments:
key, value, partition, timestamp, headers
- synchronous:
This is a synchronous signal (do not use
async def
).
The on_produce_message
signal as a synchronous signal called before
producing messages.
This can be used to attach custom headers to Kafka messages:
from typing import Any, List, Tuple
from faust.types import AppT
from mode.utils.compat import want_bytes
@app.on_produce_message.connect()
def on_produce_attach_trace_headers(
self,
sender: AppT,
key: bytes = None,
value: bytes = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: List[Tuple[str, bytes]] = None,
**kwargs: Any) -> None:
test = current_test()
if test is not None:
# Headers at this point is a list of ``(key, value)`` pairs.
# Note2: values in headers must be :class:`bytes`.
headers.extend([
(k, want_bytes(v)) for k, v in test.as_headers().items()
])
App.on_partitions_revoked
¶
The on_partitions_revoked
signal is an asynchronous signal called after every
Kafka rebalance and provides a single argument which is the set of
newly revoked partitions.
Add a callback to be called when partitions are revoked:
from typing import Set
from faust.types import AppT, TP
@app.on_partitions_revoked.connect
async def on_partitions_revoked(app: AppT,
revoked: Set[TP], **kwargs) -> None:
print(f'Partitions are being revoked: {revoked}')
Using app
as an instance when connecting here means we will only be called
for that particular app instance. If you want to be called for all app instances
then you must connect to the signal of the class (App
):
@faust.App.on_partitions_revoked.connect
async def on_partitions_revoked(app: AppT,
revoked: Set[TP], **kwargs) -> None:
...
Signal handlers must always accept **kwargs
.
Signal handler must always accept **kwargs
so that they
are backwards compatible when new arguments are added.
Similarly new arguments must be added as keyword arguments to be backwards compatible.
App.on_partitions_assigned
¶
The on_partitions_assigned
signal is an asynchronous signal called after every
Kafka rebalance and provides a single argument which is the set of
assigned partitions.
Add a callback to be called when partitions are assigned:
from typing import Set
from faust.types import AppT, TP
@app.on_partitions_assigned.connect
async def on_partitions_assigned(app: AppT,
assigned: Set[TP], **kwargs) -> None:
print(f'Partitions are being assigned: {assigned}')
App.on_configured
¶
- sender:
- arguments:
- synchronous:
This is a synchronous signal (do not use
async def
).
Called as the app reads configuration, just before the application configuration is set, but after the configuration is read.
Takes arguments: (app, conf)
, where conf is the faust.Settings
object being built and is the instance that app.conf
will be set to
after this signal returns.
Use the on_configured
signal to configure your app:
import os
import faust
app = faust.App('myapp')
@app.on_configured.connect
def configure(app, conf, **kwargs):
conf.broker = os.environ.get('FAUST_BROKER')
conf.store = os.environ.get('STORE_URL')
App.on_before_configured
¶
Called before the app reads configuration, and before the
App.on_configured
signal is dispatched.
Takes only sender as argument, which is the app being configured:
@app.on_before_configured.connect
def before_configuration(app, **kwargs):
print(f'App {app} is being configured')
App.on_after_configured
¶
Called after app is fully configured and ready for use.
Takes only sender as argument, which is the app that was configured:
@app.on_after_configured.connect
def after_configuration(app, **kwargs):
print(f'App {app} has been configured.')
App.on_worker_init
¶
Called by the faust worker program (or when using app.main()) to apply worker specific customizations.
Takes only sender as argument, which is the app a worker is being started for:
@app.on_worker_init.connect
def on_worker_init(app, **kwargs):
print(f'Working starting for app {app}')
Starting the App¶
You can start a worker instance for your app from the command-line, or you can start it inline in your Python process. To accommodate the many ways you may want to embed a Faust application, starting the app have several possible entry points:
App entry points:
faust worker
The faust worker program starts a worker instance for an app from the command-line.
You may turn any self-contained module into the faust program by adding this to the end of the file:
if __name__ == '__main__': app.main()
For packages you can add a
__main__.py
module or setuptools entry points tosetup.py
.If you have the module name where an app is defined, you can start a worker for it with the
faust -A
option:$ faust -A myproj worker -l info
The above will import the app from the
myproj
module usingfrom myproj import app
. If you need to specify a different attribute you can use a fully qualified path:$ faust -A myproj:faust_app worker -l info
->
faust.cli.worker.worker
(CLI interface)This is the faust worker program defined as a Python https://pypi.org/project/click/ command.
It is responsible for:
Parsing the command-line arguments supported by faust worker.
Printing the banner box (you will not get that with entry point 3 or 4).
Starting the
faust.Worker
(see next step).
->
faust.Worker
This is used for starting a worker from Python when you also want to install process signal handlers, etc. It supports the same options as on the faust worker command-line, but now they are passed in as keyword arguments to
faust.Worker
.The Faust worker is a subclass of
mode.Worker
, which makes sense given that Faust is built out of many different https://pypi.org/project/mode/ services starting in a particular order.The
faust.Worker
entry point is responsible for:Changing the directory when the
workdir
argument is set.Setting the process title (when https://pypi.org/project/setproctitle/ is installed), for more helpful entry in
ps
listings.Setting up
logging
: handlers, formatters and level.If
--debug
is enabled:Starting the https://pypi.org/project/aiomonitor/ debugging back door.
Starting the blocking detector.
Setting up
TERM
andINT
signal handlers.Setting up the
USR1
cry handler that logs a traceback.Starting the web server.
Autodiscovery (see
autodiscovery
).Starting the
faust.App
(see next step).Properly shut down of the event loop on exit.
To start a worker,
from synchronous code, use
Worker.execute_from_commandline
:>>> worker = Worker(app) >>> worker.execute_from_commandline()
or from an
async def
function callawait worker.start()
:Warning
You will be responsible for gracefully shutting down the event loop.
async def start_worker(worker: Worker) -> None: await worker.start() def manage_loop(): loop = asyncio.get_event_loop() worker = Worker(app, loop=loop) try: loop.run_until_complete(start_worker(worker)) finally: worker.stop_and_shutdown_loop()
Multiple apps
If you want your worker to start multiple apps, you would have to pass them in with the
*services
starargs:worker = Worker(app1, app2, app3, app4)
This way the extra apps will be started together with the main app, and the main app of the worker (
worker.app
) will end up being the first positional argument (app1
).The problem with starting multiple apps is that each app will start a web server by default.
If you want a web server for every app, you must configure the web port for each:
apps = [app1, app2, app3, app4] for i, app in enumerate(apps): app.conf.web_port = 6066 + i worker = Worker(*apps)
->
faust.App
The “worker” only concerns itself with the terminal, process signal handlers, logging, debugging mechanisms, etc., the rest is up to the app.
You can call
await app.start()
directly to get a side-effect free instance that can be embedded in any environment. It won’t even emit logs to the console unless you have configuredlogging
manually, and it won’t set up anyTERM
/INT
signal handlers, which meansfinally
blocks won’t execute at shutdown.Start app directly:
async def start_app(app): await app.start()
This will block until the worker shuts down, so if you want to start other parts of your program, you can start this in the background:
def start_in_loop(app): loop = asyncio.get_event_loop() loop.ensure_future(app.start())
If your program is written as a set of https://pypi.org/project/Mode/ services, you can simply add the app as a dependency to your service:
class MyService(mode.Service): def on_init_dependencies(self): return [faust_app]
Client-Only Mode¶
The app can also be started in “client-only” mode, which means the app can be used for sending agent RPC requests and retrieving replies, but not start a full Faust worker:
await app.start_client()
Projects and Directory Layout¶
Faust is a library; it does not mandate any specific directory layout and integrates with any existing framework or project conventions.
That said, new projects written from scratch using Faust will want some guidance on how to organize, so we include this as a suggestion in the documentation.
Small/Standalone Projects¶
You can create a small Faust service with no supporting directories at all, we refer to this as a “standalone module”: a module that contains everything it needs to run a full service.
The Faust distribution comes with several standalone examples, such as examples/word_count.py.
Medium/Large Projects¶
Projects need more organization as they grow larger, so we convert the standalone module into a directory layout:
+ proj/
- setup.py
- MANIFEST.in
- README.rst
- setup.cfg
+ proj/
- __init__.py
- __main__.py
- app.py
+ users/
- __init__.py
- agents.py
- commands.py
- models.py
- views.py
+ orders/
- __init__.py
- agents.py
- models.py
- views.py
Problem: Autodiscovery¶
Now we have many @app.agent/@app.timer’/@app.command decorators, and models spread across a nested directory. These have to be imported by the program to be registered and used.
Enter the autodiscover
setting:
# proj/app.py
import faust
app = faust.App(
'proj',
version=1,
autodiscover=True,
origin='proj' # imported name for this project (import proj -> "proj")
)
def main() -> None:
app.main()
Using the autodiscover
and setting it to True
means
it will traverse the directory of the origin module to find agents, timers,
tasks, commands and web views, etc.
If you want more careful control you can specify a list of modules to traverse instead:
app = faust.App(
'proj',
version=1,
autodiscover=['proj.users', 'proj.orders'],
origin='proj'
)
Autodiscovery when using Django
When using autodiscover=True in a Django project,
only the apps listed in INSTALLED_APPS
will be traversed.
See also Django Projects.
Problem: Entry Point¶
The proj/__main__.py
module can act as the entry point for this
project:
# proj/__main__.py
from proj.app import app
app.main()
After creating this module you can now start a worker by doing:
python -m proj worker -l info
Now you’re probably thinking, “I’m too lazy to type python dash em all the time”, but don’t worry: take it one step further by using setuptools to install a command-line program for your project.
Create a
setup.py
for your project.This step is not needed if you already have one.
You can read lots about creating your
setup.py
in the https://pypi.org/project/setuptools/ documentation here: https://setuptools.readthedocs.io/en/latest/setuptools.html#developer-s-guideA minimum example that will work well enough:
#!/usr/bin/env python from setuptools import find_packages, setup setup( name='proj', version='1.0.0', description='Use Faust to blah blah blah', author='Ola Normann', author_email='ola.normann@example.com', url='http://proj.example.com', platforms=['any'], license='Proprietary', packages=find_packages(exclude=['tests', 'tests.*']), include_package_data=True, zip_safe=False, install_requires=['faust'], python_requires='~=3.8', )
For inspiration you can also look to the setup.py files in the https://pypi.org/project/faust/ and https://pypi.org/project/mode/ source code distributions.
Add the command as a setuptools entry point.
To your
setup.py
add the following argument:setup( ..., entry_points={ 'console_scripts': [ 'proj = proj.app:main', ], }, )
This essentially defines that the
proj
program runs from proj.app import mainInstall your package using
setup.py
or pip.When developing your project locally you should use
setup.py develop
to use the source code directory as a Python package:$ python setup.py develop
You can now run the proj command you added to
setup.py
in step two:$ proj worker -l info
Why use
develop
? You can usepython setup.py install
, but then you have to run that every time you make modifications to the source files.
Another upside to using setup.py
is that you can distribute your projects
as pip install
-able packages.
Django Projects¶
Django has their own conventions for directory layout, but your Django reusable apps will want some way to import your Faust app.
We believe the best place to define the Faust app in a Django project, is in
a dedicated reusable app. See the faustapp
app in the
examples/django
directory in the Faust source code distribution.
Miscellaneous¶
Why use applications?¶
For special needs, you can inherit from the faust.App
class, and a subclass
will have the ability to change how almost everything works.
Comparing the application to the interface of frameworks like Django, there are clear benefits.
In Django, the global settings module means having multiple configurations are impossible, and with an API organized by modules, you sometimes end up with lots of import statements and keeping track of many modules. Further, you often end up monkey patching to change how something works.
The application keeps the library flexible to changes, and allows for many applications to coexist in the same process space.
Reference¶
See faust.App
in the API reference for a full list of methods
and attributes supported.