RocksDB storage.

class faust.stores.rocksdb.Options[source]

Dummy Options.

class faust.stores.rocksdb.DB[source]

Dummy DB.

class faust.stores.rocksdb.PartitionDB(partition: int, db: DB)[source]

Tuple of (partition, rocksdb.DB).

partition: int

Alias for field number 0

db: DB

Alias for field number 1

class faust.stores.rocksdb.RocksDBOptions(max_open_files: Optional[int] = None, write_buffer_size: Optional[int] = None, max_write_buffer_number: Optional[int] = None, target_file_size_base: Optional[int] = None, block_cache_size: Optional[int] = None, block_cache_compressed_size: Optional[int] = None, bloom_filter_size: Optional[int] = None, use_rocksdict: Optional[bool] = None, ttl: Optional[int] = None, **kwargs: Any)[source]

Options required to open a RocksDB database.

max_open_files: Optional[int] = 58983
write_buffer_size: int = 67108864
max_write_buffer_number: int = 3
target_file_size_base: int = 67108864
block_cache_size: int = 2147483648
block_cache_compressed_size: int = 524288000
bloom_filter_size: int = 3
use_rocksdict: bool = False
extra_options: Mapping
open(path: Path, *, read_only: bool = False) DB[source]

Open RocksDB database using this configuration.

Return type:


as_options() Options[source]

Return rocksdb.Options object using this configuration.

Return type:


class faust.stores.rocksdb.Store(url: Union[str, URL], app: AppT, table: CollectionT, *, key_index_size: Optional[int] = None, options: Optional[Mapping[str, Any]] = None, read_only: Optional[bool] = False, driver: Optional[str] = None, **kwargs: Any)[source]

RocksDB table storage.


You can specify ‘read_only’ as an option into a Table class to allow a RocksDB store be used by multiple apps:

app.App(..., store="rocksdb://")
app.GlobalTable(..., options={'read_only': True})

You can also switch between RocksDB drivers this way:

app.GlobalTable(..., options={'driver': 'rocksdict'})
app.GlobalTable(..., options={'driver': 'python-rocksdb'})

If you wish to remove the WAL files after a certain amount of time, you can set a TTL this way:

app.GlobalTable(..., options={'ttl': 60 * 60 * 24})  # 1 day

Note that the TTL is in seconds.


Note that rocksdict uses RocksDB 8. You won’t be able to return to using python-rocksdb, which uses RocksDB 6.

offset_key = b'__faust\x00offset__'
rocksdb_options: RocksDBOptions

Used to configure the RocksDB settings for table stores.

key_index_size: int

Decides the size of the K=>TopicPartition index (10_000).

db_lock: Lock
rebalance_ack: bool
async backup_partition(tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1) None[source]

Backup partition from this store.

This will be saved in a separate directory in the data directory called ‘{table-name}-backups’.

This is only available in python-rocksdb.

  • tp (_UnionGenericAlias[TP, int]) – Partition to backup

  • flush (bool) – Flush the memset before backing up the state of the table.

  • purge (bool) – Purge old backups in the process

  • keep (int) – How many backups to keep after purging

This is only supported in newer versions of python-rocksdb which can read the RocksDB database using multi-process read access. See https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB to know more.

Example usage:

table = app.GlobalTable(..., partitions=1)
table.data.backup_partition(0, flush=True, purge=True, keep=1)
Return type:


restore_backup(tp: Union[TP, int], latest: bool = True, backup_id: int = 0) None[source]

Restore partition backup from this store.

  • tp (_UnionGenericAlias[TP, int]) – Partition to restore

  • latest (bool) – Restore the latest backup, set as False to restore a specific ID

  • backup_id (int) – Backup to restore

An example of how the method can be accessed:

table = app.GlobalTable(..., partitions=1)
Return type:


persisted_offset(tp: TP) Optional[int][source]

Return the last persisted offset.

See set_persisted_offset().

Return type:

_UnionGenericAlias[int, None]

set_persisted_offset(tp: TP, offset: int) None[source]

Set the last persisted offset for this table.

This will remember the last offset that we wrote to RocksDB, so that on rebalance/recovery we can seek past this point to only read the events that occurred recently while we were not an active replica.

Return type:


async need_active_standby_for(tp: TP) bool[source]

Decide if an active standby is needed for this topic partition.

Since other workers may be running on the same local machine, we can decide to not actively read standby messages, since that database file is already being populated.

Currently, it is recommended that you use separate data directories for multiple workers on the same machine.

For example if you have a 4 CPU core machine, you can run four worker instances on that machine, but using separate data directories:

$ myproj --datadir=/var/faust/w1 worker -l info --web-port=6066
$ myproj --datadir=/var/faust/w2 worker -l info --web-port=6067
$ myproj --datadir=/var/faust/w3 worker -l info --web-port=6068
$ myproj --datadir=/var/faust/w4 worker -l info --web-port=6069
Return type:


apply_changelog_batch(batch: Iterable[EventT], to_key: Callable[[Any], Any], to_value: Callable[[Any], Any]) None[source]

Write batch of changelog events to local RocksDB storage.

  • batch (_GenericAlias[EventT]) – Iterable of changelog events (faust.Event)

  • to_key (_CallableGenericAlias[Any, Any]) – A callable you can use to deserialize the key of a changelog event.

  • to_value (_CallableGenericAlias[Any, Any]) – A callable you can use to deserialize the value of a changelog event.

Return type:


logger: logging.Logger = <Logger faust.stores.rocksdb (WARNING)>
async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]

Rebalance occurred.

  • assigned (_GenericAlias[TP]) – Set of all assigned topic partitions.

  • revoked (_GenericAlias[TP]) – Set of newly revoked topic partitions.

  • newly_assigned (_GenericAlias[TP]) – Set of newly assigned topic partitions, for which we were not assigned the last time.

  • generation_id (int) – the metadata generation identifier for the re-balance

Return type:


async stop() None[source]

Stop the service.

Return type:


revoke_partitions(table: CollectionT, tps: Set[TP]) None[source]

De-assign partitions used on this worker instance.

  • table (CollectionT) – The table that we store data for.

  • tps (_GenericAlias[TP]) – Set of topic partitions that we should no longer be serving data for.

Return type:


async assign_partitions(table: CollectionT, tps: Set[TP], generation_id: int = 0) None[source]

Assign partitions to this worker instance.

  • table (CollectionT) – The table that we store data for.

  • tps (_GenericAlias[TP]) – Set of topic partitions we have been assigned.

Return type:


reset_state() None[source]

Remove all data stored in this table.


Only local data will be removed, table changelog partitions in Kafka will not be affected.

Return type:


partition_path(partition: int) Path[source]

Return pathlib.Path to db file of specific partition.

Return type:


property path: Path

Path to directory where tables are stored.

See also

tabledir (default value for this path).

Return type:




property basename: Path

Return the name of this table, used as filename prefix. :rtype: Path