faust.stores.rocksdb
¶
RocksDB storage.
- class faust.stores.rocksdb.PartitionDB(partition: int, db: DB)[source]¶
Tuple of
(partition, rocksdb.DB)
.
- class faust.stores.rocksdb.RocksDBOptions(max_open_files: Optional[int] = None, write_buffer_size: Optional[int] = None, max_write_buffer_number: Optional[int] = None, target_file_size_base: Optional[int] = None, block_cache_size: Optional[int] = None, block_cache_compressed_size: Optional[int] = None, bloom_filter_size: Optional[int] = None, **kwargs: Any)[source]¶
Options required to open a RocksDB database.
- open(path: Path, *, read_only: bool = False) DB [source]¶
Open RocksDB database using this configuration.
- Return type:
- as_options() Options [source]¶
Return
rocksdb.Options
object using this configuration.- Return type:
- class faust.stores.rocksdb.Store(url: Union[str, URL], app: AppT, table: CollectionT, *, key_index_size: Optional[int] = None, options: Optional[Mapping[str, Any]] = None, read_only: Optional[bool] = False, **kwargs: Any)[source]¶
RocksDB table storage.
Tip
You can specify ‘read_only’ as an option into a Table class to allow a RocksDB store be used by multiple apps:
app.App(..., store="rocksdb://") app.GlobalTable(..., options={'read_only': True})
- offset_key = b'__faust\x00offset__'¶
- rocksdb_options: RocksDBOptions¶
Used to configure the RocksDB settings for table stores.
- db_lock: Lock¶
- async backup_partition(tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1) None [source]¶
Backup partition from this store.
This will be saved in a separate directory in the data directory called ‘{table-name}-backups’.
- Parameters:
This is only supported in newer versions of python-rocksdb which can read the RocksDB database using multi-process read access. See https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB to know more.
Example usage:
table = app.GlobalTable(..., partitions=1) table.data.backup_partition(0, flush=True, purge=True, keep=1)
- Return type:
None
- restore_backup(tp: Union[TP, int], latest: bool = True, backup_id: int = 0) None [source]¶
Restore partition backup from this store.
- Parameters:
An example of how the method can be accessed:
table = app.GlobalTable(..., partitions=1) table.data.restore_backup(0)
- Return type:
None
- persisted_offset(tp: TP) Optional[int] [source]¶
Return the last persisted offset.
- Return type:
_GenericAlias
[int
,None
]
- set_persisted_offset(tp: TP, offset: int) None [source]¶
Set the last persisted offset for this table.
This will remember the last offset that we wrote to RocksDB, so that on rebalance/recovery we can seek past this point to only read the events that occurred recently while we were not an active replica.
- Return type:
None
- async need_active_standby_for(tp: TP) bool [source]¶
Decide if an active standby is needed for this topic partition.
Since other workers may be running on the same local machine, we can decide to not actively read standby messages, since that database file is already being populated.
Currently it is recommended that you use separate data directories for multiple workers on the same machine.
For example if you have a 4 CPU core machine, you can run four worker instances on that machine, but using separate data directories:
$ myproj --datadir=/var/faust/w1 worker -l info --web-port=6066 $ myproj --datadir=/var/faust/w2 worker -l info --web-port=6067 $ myproj --datadir=/var/faust/w3 worker -l info --web-port=6068 $ myproj --datadir=/var/faust/w4 worker -l info --web-port=6069
- Return type:
- apply_changelog_batch(batch: Iterable[EventT], to_key: Callable[[Any], Any], to_value: Callable[[Any], Any]) None [source]¶
Write batch of changelog events to local RocksDB storage.
- Parameters:
- Return type:
None
- async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None [source]¶
Rebalance occurred.
- Parameters:
assigned (
_GenericAlias
[TP
]) – Set of all assigned topic partitions.revoked (
_GenericAlias
[TP
]) – Set of newly revoked topic partitions.newly_assigned (
_GenericAlias
[TP
]) – Set of newly assigned topic partitions, for which we were not assigned the last time.generation_id (
int
) – the metadata generation identifier for the re-balance
- Return type:
None
- revoke_partitions(table: CollectionT, tps: Set[TP]) None [source]¶
De-assign partitions used on this worker instance.
- Parameters:
table (
CollectionT
) – The table that we store data for.tps (
_GenericAlias
[TP
]) – Set of topic partitions that we should no longer be serving data for.
- Return type:
None
- async assign_partitions(table: CollectionT, tps: Set[TP], generation_id: int = 0) None [source]¶
Assign partitions to this worker instance.
- Parameters:
table (
CollectionT
) – The table that we store data for.tps (
_GenericAlias
[TP
]) – Set of topic partitions we have been assigned.
- Return type:
None
- logger: logging.Logger = <Logger faust.stores.rocksdb (WARNING)>¶
- reset_state() None [source]¶
Remove all data stored in this table.
Notes
Only local data will be removed, table changelog partitions in Kafka will not be affected.
- Return type:
None
- partition_path(partition: int) Path [source]¶
Return
pathlib.Path
to db file of specific partition.- Return type: