Tables and Windowing¶
Tables¶
Basics¶
A table is a distributed in-memory dictionary, backed by a Kafka changelog topic used for persistence and fault-tolerance. We can replay the changelog upon network failure and node restarts, allowing us to rebuild the state of the table as it was before the fault.
To create a table use app.Table
:
table = app.Table('totals', default=int)
You cannot modify a table outside of a stream operation; this means that you can
only mutate the table from within an async for event in stream:
block.
We require this to align the table’s partitions with the stream’s, and to
ensure the source topic partitions are correctly rebalanced to a different
worker upon failure, along with any necessary table partitions.
Modifying a table outside of a stream will raise an error:
table = app.Table('totals', default=int)
# cannot modify table, as we are not iterating over stream
table['foo'] += 30
This source-topic-event to table-modification-event requirement also ensures that producing to the changelog and committing messages from the source happen simultaneously.
Warning
An abruptly terminated Faust worker can allow some changelog entries to go through, before having committed the source topic offsets.
Duplicate messages may result in double-counting and other data consistency issues, but since version 1.5 of Faust you can enable a setting for strict processing guarantees.
See the processing_guarantee
setting for more information.
Co-partitioning Tables and Streams¶
When managing stream partitions and their corresponding changelog partitions, “co-partitioning” ensures the correct distribution of stateful processing among available clients, but one requirement is that tables and streams must share shards.
To shard the table differently, you must first repartition the stream using
group_by
.
Repartition a stream:
withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
country_to_total = app.Table(
'country_to_total', default=int).tumbling(10.0, expires=10.0)
withdrawals_stream = app.topic('withdrawals', value_type=Withdrawal).stream()
withdrawals_by_country = withdrawals_stream.group_by(Withdrawal.country)
@app.agent
async def process_withdrawal(withdrawals):
async for withdrawal in withdrawals.group_by(Withdrawal.country):
country_to_total[withdrawal.country] += withdrawal.amount
If the stream and table are not co-partitioned, we could end up with a table shard ending up on a different worker than the worker processing its corresponding stream partition.
Warning
For this reason, table changelog topics must have the same number of partitions as the source topic.
Table Sharding¶
Tables shards in Kafka must organize using a disjoint distribution of keys so that any computation for a subset of keys always happen together in the same worker process.
The following is an example of incorrect usage where subsets of keys are likely to be processed by different worker processes:
withdrawals_topic = app.topic('withdrawals', key_type=str,
value_type=Withdrawal)
user_to_total = app.Table('user_to_total', default=int)
country_to_total = app.Table(
'country_to_total', default=int).tumbling(10.0, expires=10.0)
@app.agent(withdrawals_topic)
async def process_withdrawal(withdrawals):
async for withdrawal in withdrawals:
user_to_total[withdrawal.user] += withdrawal.amount
country_to_total[withdrawal.country] += withdrawal.amount
Here the stream withdrawals
is (implicitly) partitioned by the user ID used
as message key. So the country_to_total
table, instead of being
partitioned by country name, is partitioned by the user ID. In practice,
this means that data for a country may reside on multiple partitions, and
worker instances end up with incomplete data.
To fix that rewrite your program like this, using two distinct agents and repartition the stream by country when populating the table:
withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
user_to_total = app.Table('user_to_total', default=int)
country_to_total = app.Table(
'country_to_total', default=int).tumbling(10.0, expires=10.0)
@app.agent(withdrawals_topic)
async def find_large_user_withdrawals(withdrawals):
async for withdrawal in withdrawals:
user_to_total[withdrawal.user] += withdrawal.amount
@app.agent(withdrawals_topic)
async def find_large_country_withdrawals(withdrawals):
async for withdrawal in withdrawals.group_by(Withdrawal.country):
country_to_total[withdrawal.country] += withdrawal.amount
The Changelog¶
Every modification to a table has a corresponding changelog update, the changelog is used to recover data after a failure.
We store the changelog in Kafka as a topic and use log compaction to only keep the most recent value for a key in the log. Kafka periodically compacts the table, to ensure the log does not grow beyond the number of keys in the table.
Note
In production the RocksDB store allows for almost instantaneous recovery of tables: a worker only needs to retrieve updates missed since last time the instance was up.
If you change the value for a key in the table, please make sure you update the table with the new value after:
In order to publish a changelog message into Kafka for fault-tolerance the table needs to be set explicitly. Hence, while changing values in Tables by reference, we still need to explicitly set the value to publish to the changelog, as shown below:
user_withdrawals = app.Table('user_withdrawals', default=list)
topic = app.topic('withdrawals', value_type=Withdrawal)
async for event in topic.stream():
# get value for key in table
withdrawals = user_withdrawals[event.account]
# modify the value
withdrawals.append(event.amount)
# write it back to the table (also updating changelog):
user_withdrawals[event.account] = withdrawals
If you forget to do so, like in the following example, the program will work but will have inconsistent data if a recovery is needed for any reason:
user_withdrawals = app.Table('user_withdrawals', default=list)
topic = app.topic('withdrawals', value_type=Withdrawal)
async for event in topic.stream():
withdrawals = user_withdrawals[event.account]
withdrawals.append(event.amount)
# OOPS! Did not update the table with the new value
Due to this changelog, both table keys and values must be serializable.
See also
The Models, Serialization, and Codecs guide for more information about models and serialization.
Note
Faust creates an internal changelog topic for each table. The Faust application should be the only client producing to the changelog topics.
Windowing¶
Windowing allows us to process streams while preserving state over defined windows of time. A windowed table preserves key-value pairs according to the configured “Windowing Policy.”
We support the following policies:
- class TumblingWindow¶
This class creates fixed-sized, non-overlapping and contiguous time intervals
to preserve key-value pairs, e.g. Tumbling(10)
will create non-overlapping
10 seconds windows:
window 1: ----------
window 2: ----------
window 3: ----------
window 4: ----------
window 5: ----------
This class is exposed as a method from the output of app.Table()
, it takes
a mandatory parameter size
, representing the window (time interval) duration
and an optional parameter expires
, representing the duration for which we
want to store the data (key-value pairs) allocated to each window.
- class HoppingWindow¶
This class creates fixed-sized, overlapping time intervals to preserve key-value
pairs, e.g. Hopping(10, 5)
will create overlapping 10 seconds windows. Each
window will be created every 5 seconds.
window 1: ----------
window 2: ----------
window 3: ----------
window 4: ----------
window 5: ----------
window 6: ----------
This class is exposed as a method from the output of app.Table()
, it takes 2
mandatory parameters:
size
, representing the window (time interval) duration.step
, representing the time interval used to create new windows.
It also takes an optional parameter expires
, representing the duration for
which we want to store the data (key-value pairs) allocated to each window.
How To¶
You can define a windowed table like this:
from datetime import timedelta
views = app.Table('views', default=int).tumbling(
timedelta(minutes=1),
expires=timedelta(hours=1),
)
Since a key can exist in multiple windows, the windowed table returns a special
wrapper for table[k]
, called a WindowSet
.
Here’s an example of a windowed table in use:
page_views_topic = app.topic('page_views', value_type=str)
@app.agent(events_topic)
async def aggregate_page_views(pages):
# values in this streams are URLs as strings.
async for page_url in pages:
# increment one to all windows this page URL fall into.
views[page_url] += 1
if views[page_url].now() >= 10000:
# Page is trending for current processing time window
print('Trending now')
if views[page_url].current() >= 10000:
# Page would be trending in the current event's time window
print('Trending when event happened')
if views[page_url].value() >= 10000:
# Page would be trending in the current event's time window
# according to the relative time set when creating the
# table.
print('Trending when event happened')
if views[page_url].delta(timedelta(minutes=30)) > views[page_url].now():
print('Less popular compared to 30 minutes back')
In this table, table[k].now()
returns the most recent value for the
current processing window, overriding the _relative_to_ option used to create
the window.
In this table, table[k].current()
returns the most recent value relative
to the time of the currently processing event, overriding the _relative_to_
option used to create the window.
In this table, table[k].value()
returns the most recent value relative
to the time of the currently processing event, and is the default behavior.
You can also make the current value relative to the current local time, relative to a different field in the event (if it has a custom timestamp field), or of another event.
The default behavior is “relative to current stream”:
views = app.Table('views', default=int).tumbling(...).relative_to_stream()
Where .relative_to_stream()
means values are selected based on the window
of the current event in the currently processing stream.
You can also use .relative_to_now()
: this means the window of the current
local time is used instead:
views = app.Table('views', default=int).tumbling(...).relative_to_now()
If the current event has a custom timestamp field that you want to use,
relative_to_field(field_descriptor)
is suited for that task:
views = app.Table('views', default=int) \
.tumbling(...) \
.relative_to_field(Account.date_created)
You can override this default behavior when accessing data in the table:
@app.agent(topic)
async def process(stream):
async for event in stream:
# Get latest value for key', based on the tables default
# relative to option.
print(table[key].value())
# You can bypass the default relative to option, and
# get the value closest to the event timestamp
print(table[key].current())
# You can bypass the default relative to option, and
# get the value closest to the current local time
print(table[key].now())
# Or get the value for a delta, e.g. 30 seconds ago, relative
# to the event timestamp
print(table[key].delta(30))
Note
We always retrieve window data based on timestamps. With tumbling windows there is just one window at a time, so for a given timestamp there is just one corresponding window. This is not the case for for hopping windows, in which a timestamp could be located in more than 1 window.
At this point, when accessing data from a hopping table, we always access the latest window for a given timestamp and we have no way of modifying this behavior.
Iterating over keys/values/items in a windowed table.¶
Note
Tables are distributed across workers, so when iterating over table contents you will only see the partitions assigned to the current worker.
Iterating over all the keys in a table will require you to visit all workers, which is highly impractical in a production system.
For this reason table iteration is mostly used in debugging and observing your system.
To iterate over the keys/items/values in windowed table you may
add the key_index
option to enable support for it:
windowed_table = app.Table(
'name',
default=int,
).hopping(10, 5, expires=timedelta(minutes=10), key_index=True)
Adding the key index means we keep a second table as an index of the keys present in the table. Whenever a new key is added we add the key to the key index, similarly whenever a key is deleted we also delete it from the index.
This enables fast iteration over the keys, items and values in the windowed table, with the caveat that those keys may not exist in all windows.
The table iterator views (.keys()
/.items()
/.values()
)
will be time-relative to the stream by default, unless you have changed
the time-relativity using the .relative_to_now
or
relative_to_timestamp
modifiers:
# Show keys present relative to time of current event in stream:
print(list(windowed_table.keys()))
# Show items present relative to time of current event in stream:
print(list(windowed_table.items()))
# Show values present relative to time of current event in stream:
print(list(windowed_table.values()))
You can also manually specify the time-relativity:
# Change time-relativity to current wall-clock time,
# and show a list of items present in that window.
print(list(windowed_table.relative_to_now().items()))
# Get items present 30 seconds ago:
print(list(windowed_table.relative_to_now().items().delta(30.0)))
“Out of Order” Events¶
Kafka maintains the order of messages published to it, but when using custom timestamp fields, relative ordering is not guaranteed.
For example, a producer can lose network connectivity while sending a batch of messages and be forced to retry sending them later, then messages in the topic won’t be in timestamp order.
Windowed tables in Faust correctly handles such “out of order “ events, at least until the message is as old as the table expiry configuration.
Note
We handle out of order events by storing separate aggregates for each
window in the last expires
seconds. The space complexity for this
is O(w * K)
where w
is the number of windows in the last
expires seconds and K
is the number of keys in the table.
Table Serialization¶
A table is a mapping with keys and values, serialized using JSON by default.
If you want to use a different serialization mechanism you must provide
the key_type
and value_type
arguments which could be bytes
or model
type (faust.Record):
table = app.Table(
'name',
key_type=Withdrawal,
value_type=Withdrawal,
)