Tasks, Timers, Cron Jobs, Web Views, and CLI Commands¶
Tasks¶
Your application will have agents that process events in streams, but
can also start asyncio.Task
-s that do other things,
like periodic timers, views for the embedded web server, or additional
command-line commands.
Decorating an async function with the @app.task decorator will tell the worker to start that function as soon as the worker is fully operational:
@app.task
async def on_started():
print('APP STARTED')
If you add the above to the module that defines your app and start the worker, you should see the message printed in the output of the worker.
A task is a one-off task; if you want to do something at periodic intervals, you can use a timer.
Timers¶
A timer is a task that executes every n
seconds:
@app.timer(interval=60.0)
async def every_minute():
print('WAKE UP')
After starting the worker, and it’s operational, the above timer will print something every minute.
Cron Jobs¶
A Cron job is a task that executes according to a Crontab format, usually at fixed times:
@app.crontab('0 20 * * *')
async def every_day_at_8_pm():
print('WAKE UP ONCE A DAY')
After starting the worker, and it’s operational, the above Cron job will print something every day at 8pm.
crontab
takes 1 mandatory argument cron_format
and 2 optional arguments:
tz
, represents the timezone. Defaults to None which gives behaves as UTC.on_leader
, boolean defaults to False, only run on leader?
@app.crontab('0 20 * * *', tz=pytz.timezone('US/Pacific'), on_leader=True)
async def every_day_at_8_pm_pacific():
print('WAKE UP AT 8:00pm PACIFIC TIME ONLY ON THE LEADER WORKER')
Web Views¶
The Faust worker will also expose a web server on every instance, that by default runs on port 6066. You can access this in your web browser after starting a worker instance on your local machine:
$ faust -A myapp worker -l info
Just point your browser to the local port to see statistics about your running instance:
http://localhost:6066
You can define additional views for the web server (called pages). The server will use the https://pypi.org/project/aiohttp/ HTTP server library, but you can also write custom web server drivers.
Add a simple page returning a JSON structure by adding this to your app module:
# this counter exists in-memory only,
# so will be wiped when the worker restarts.
count = [0]
@app.page('/count/')
async def get_count(self, request):
# update the counter
count[0] += 1
# and return it.
return self.json({
'count': count[0],
})
This example view is of limited usefulness. It only provides you with a count of how many times the page is requested, on that particular server, for as long as it’s up, but you can also call actors or access table data in web views.
Restart your Faust worker, and you can visit your new page at:
http://localhost:6066/count/
Your workers may have an arbitrary number of views, and it’s up to you what they provide. Just like other web applications they can communicate with Redis, SQL databases, and so on. Anything you want, really, and it’s executing in an asynchronous event loop.
You can decide to develop your web app directly in the Faust workers, or you may choose to keep your regular web server separate from your Faust workers.
You can create complex systems quickly, just by putting everything in a single Faust app.
HTTP Verbs: GET
/POST
/PUT
/DELETE
¶
Specify a faust.web.View
class when you need to handle HTTP
verbs other than GET
:
from faust.web import Request, Response, View
@app.page('/count/')
class counter(View):
count: int = 0
async def get(self, request: Request) -> Response
return self.json({'count': self.count})
async def post(self, request: Request) -> Response:
n: int = request.query['n']
self.count += 1
return self.json({'count': self.count})
async def delete(self, request: Request) -> Response:
self.count = 0
Exposing Tables¶
A frequent requirement is the ability to expose table values in a web view, and while this is likely to be built-in to Faust in the future, you will have to implement this manually for now.
Tables are partitioned by key, and data for any specific key will exist
on a particular worker instance. You can use the @app.table_route
decorator to reroute the request to the worker holding that partition.
We define our table, and an agent reading from the stream to populate the table:
import faust
app = faust.App(
'word-counts',
broker='kafka://localhost:9092',
store='rocksdb://',
topic_partitions=8,
)
posts_topic = app.topic('posts', value_type=str)
word_counts = app.Table('word_counts', default=int,
help='Keep count of words (str to int).')
class Word(faust.Record):
word: str
@app.agent(posts_topic)
async def shuffle_words(posts):
async for post in posts:
for word in post.split():
await count_words.send(key=word, value=Word(word=word))
@app.agent()
async def count_words(words):
"""Count words from blog post article body."""
async for word in words:
word_counts[word.word] += 1
After that we define the view, using the @app.table_route
decorator to
reroute the request to the correct worker instance:
@app.page('/count/{word}/')
@app.table_route(table=word_counts, match_info='word')
async def get_count(web, request, word):
return web.json({
word: word_counts[word],
})
In the above example we used part of the URL to find the given word, but you may also want to get this from query parameters.
Table route based on key in query parameter:
@app.page('/count/')
@app.table_route(table=word_counts, query_param='word')
async def get_count(web, request):
word = request.query['word']
return web.json({
word: word_counts[word],
})
CLI Commands¶
As you may already know, you can make your project into an executable,
that can start Faust workers, list agents, models and more,
just by calling app.main()
.
Even if you don’t do that, the faust program is always available and you can point it to any app:
$ faust -A myapp worker -l info
The myapp
argument should point to a Python module/package having
an app
attribute. If the attribute has a different name, please specify
a fully qualified path:
$ faust -A myproj.apps:faust_app worker -l info
Do --help
to get a list of subcommands supported by the app:
$ faust -A myapp --help
To turn your script into the faust command, with the
-A
option already set, add this to the end of the module:
if __name__ == '__main__':
app.main()
If saved as simple.py
you can now execute it as if it was
the faust program:
$ python simple.py worker -l info
Custom CLI Commands¶
To add a custom command to your app, see the examples/simple.py
example in the Faust distribution, where we added a produce
command
used to send example data into the stream processors:
from faust.cli import option
# the full example is in examples/simple.py in the Faust distribution.
# this only shows the command part of this code.
@app.command(
option('--max-latency',
type=float, default=PRODUCE_LATENCY,
help='Add delay of (at most) n seconds between publishing.'),
option('--max-messages',
type=int, default=None,
help='Send at most N messages or 0 for infinity.'),
)
async def produce(self, max_latency: float, max_messages: int):
"""Produce example Withdrawal events."""
num_countries = 5
countries = [f'country_{i}' for i in range(num_countries)]
country_dist = [0.9] + ([0.10 / num_countries] * (num_countries - 1))
num_users = 500
users = [f'user_{i}' for i in range(num_users)]
self.say('Done setting up. SENDING!')
for i in range(max_messages) if max_messages is not None else count():
withdrawal = Withdrawal(
user=random.choice(users),
amount=random.uniform(0, 25_000),
country=random.choices(countries, country_dist)[0],
date=datetime.utcnow().replace(tzinfo=timezone.utc),
)
await withdrawals_topic.send(key=withdrawal.user, value=withdrawal)
if not i % 10000:
self.say(f'+SEND {i}')
if max_latency:
await asyncio.sleep(random.uniform(0, max_latency))
The @app.command
decorator accepts both click.option
and
click.argument
, so you can specify command-line options, as
well as command-line positional arguments.
Daemon Commands¶
The daemon
flag can be set to mark the command as a background service
that won’t exit until the user hits Control-c, or the process is
terminated by another signal:
@app.command(
option('--foo', type=float, default=1.33),
daemon=True,
)
async def my_daemon(self, foo: float):
print('STARTING DAEMON')
...
# set up some stuff
# we can return here but the program will not shut down
# until the user hits :kbd:`Control-c`, or the process is terminated
# by signal
return