faust.stores.base

Base class for table storage drivers.

class faust.stores.base.Store(url: Union[str, URL], app: AppT, table: CollectionT, *, table_name: str = '', key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, options: Optional[Mapping[str, Any]] = None, **kwargs: Any)[source]

Base class for table storage drivers.

url: URL
app: _AppT
table: _CollectionT
table_name: str
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
key_serializer: Optional[Union[CodecT, str]]
value_serializer: Optional[Union[CodecT, str]]
options: Optional[Mapping[str, Any]]
persisted_offset(tp: TP) Optional[int][source]

Return the persisted offset for this topic and partition.

Return type:

_GenericAlias[int, None]

set_persisted_offset(tp: TP, offset: int) None[source]

Set the persisted offset for this topic and partition.

Return type:

None

async need_active_standby_for(tp: TP) bool[source]

Return True if we have a copy of standby from elsewhere.

Return type:

bool

async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]

Handle rebalancing of the cluster.

Return type:

None

async on_recovery_completed(active_tps: Set[TP], standby_tps: Set[TP]) None[source]

Signal that table recovery completed.

Return type:

None

property label: str

Return short description of this store. :rtype: str

logger: logging.Logger = <Logger faust.stores.base (WARNING)>
class faust.stores.base.SerializedStore(url: Union[str, URL], app: AppT, table: CollectionT, *, table_name: str = '', key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, options: Optional[Mapping[str, Any]] = None, **kwargs: Any)[source]

Base class for table storage drivers requiring serialization.

apply_changelog_batch(batch: Iterable[EventT], to_key: Callable[[Any], KT], to_value: Callable[[Any], VT]) None[source]

Apply batch of events from changelog topic to this store.

Return type:

None

keys() KeysView[source]

Return view of keys in the K/V store.

Return type:

KeysView

values() ValuesView[source]

Return view of values in the K/V store.

Return type:

ValuesView

items() ItemsView[source]

Return view of items in the K/V store as (key, value) pairs.

Return type:

ItemsView

clear() None[source]

Clear all data from this K/V store.

Return type:

None

logger: logging.Logger = <Logger faust.stores.base (WARNING)>
url: URL
app: _AppT
table: _CollectionT
table_name: str
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
key_serializer: Optional[Union[CodecT, str]]
value_serializer: Optional[Union[CodecT, str]]
options: Optional[Mapping[str, Any]]
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
data: MutableMapping[KT, VT]