faust.tables

Tables: Distributed object K/V-store.

class faust.tables.Collection(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, **kwargs: Any)[source]

Base class for changelog-backed data structures stored in Kafka.

property data: StoreT

Underlying table storage.

async on_start() None[source]

Call when table starts.

Return type:

None

on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]][source]

Add function as callback to be called on table recovery.

Return type:

_GenericAlias[_GenericAlias[None]]

info() Mapping[str, Any][source]

Return table attributes as dictionary.

Return type:

_GenericAlias[str, Any]

persisted_offset(tp: TP) Optional[int][source]

Return the last persisted offset for topic partition.

Return type:

_GenericAlias[int, None]

async need_active_standby_for(tp: TP) bool[source]

Return False if we have access to partition data.

Return type:

bool

reset_state() None[source]

Reset local state.

Return type:

None

send_changelog(partition: Optional[int], key: Any, value: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None) FutureMessage[source]

Send modification event to changelog topic.

Return type:

FutureMessage

partition_for_key(key: Any) Optional[int][source]

Return partition number for table key.

Always returns None when use_partitioner is enabled.

Return type:

_GenericAlias[int, None]

Returns:

specific partition or None if

the producer should select partition using its partitioner.

Return type:

Optional[int]

async on_window_close(key: Any, value: Any) None[source]
Return type:

None

join(*fields: FieldDescriptorT) StreamT[source]

Right join of this table and another stream/table.

Return type:

StreamT

left_join(*fields: FieldDescriptorT) StreamT[source]

Left join of this table and another stream/table.

Return type:

StreamT

inner_join(*fields: FieldDescriptorT) StreamT[source]

Inner join of this table and another stream/table.

Return type:

StreamT

outer_join(*fields: FieldDescriptorT) StreamT[source]

Outer join of this table and another stream/table.

Return type:

StreamT

clone(**kwargs: Any) Any[source]

Clone table instance.

Return type:

Any

combine(*nodes: JoinableT, **kwargs: Any) StreamT[source]

Combine tables and streams.

Return type:

StreamT

contribute_to_stream(active: StreamT) None[source]

Contribute table to stream join.

Return type:

None

async remove_from_stream(stream: StreamT) None[source]

Remove table from stream join after stream stopped.

Return type:

None

async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]

Call when cluster is rebalancing.

Return type:

None

async on_recovery_completed(active_tps: Set[TP], standby_tps: Set[TP]) None[source]

Call when recovery has completed after rebalancing.

Return type:

None

async call_recover_callbacks() None[source]

Call any configured recovery callbacks after rebalancing.

Return type:

None

async on_changelog_event(event: EventT) None[source]

Call when a new changelog event is received.

Return type:

None

property label: str

Return human-readable label used to represent this table. :rtype: str

property shortlabel: str

Return short label used to represent this table in logs. :rtype: str

property changelog_topic: TopicT

Return the changelog topic used by this table. :rtype: TopicT

logger: logging.Logger = <Logger faust.tables.base (WARNING)>
property changelog_topic_name: str
Return type:

str

apply_changelog_batch(batch: Iterable[EventT]) None[source]

Apply batch of events from changelog topic local table storage.

Return type:

None

class faust.tables.CollectionT(app: _AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, **kwargs: Any)[source]
app: _AppT
name: str
default: Any
schema: Optional[_SchemaT]
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
partitions: Optional[int]
window: Optional[WindowT]
help: str
recovery_buffer_size: int
standby_buffer_size: int
options: Optional[Mapping[str, Any]]
last_closed_window: float
use_partitioner: bool
is_global: bool = False
abstract clone(**kwargs: Any) Any[source]
Return type:

Any

abstract property changelog_topic: TopicT
Return type:

TopicT

abstract apply_changelog_batch(batch: Iterable[EventT]) None[source]
Return type:

None

abstract persisted_offset(tp: TP) Optional[int][source]
Return type:

_GenericAlias[int, None]

abstract async need_active_standby_for(tp: TP) bool[source]
Return type:

bool

abstract reset_state() None[source]
Return type:

None

abstract send_changelog(partition: Optional[int], key: Any, value: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None) FutureMessage[source]
Return type:

FutureMessage

abstract partition_for_key(key: Any) Optional[int][source]
Return type:

_GenericAlias[int, None]

abstract async on_window_close(key: Any, value: Any) None[source]
Return type:

None

abstract async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]
Return type:

None

abstract async on_changelog_event(event: EventT) None[source]
Return type:

None

abstract on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]][source]
Return type:

_GenericAlias[_GenericAlias[None]]

abstract async on_recovery_completed(active_tps: Set[TP], standby_tps: Set[TP]) None[source]
Return type:

None

abstract async call_recover_callbacks() None[source]
Return type:

None

abstract using_window(window: WindowT, *, key_index: bool = False) WindowWrapperT[source]
Return type:

WindowWrapperT

abstract hopping(size: Union[timedelta, float, str], step: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT[source]
Return type:

WindowWrapperT

abstract tumbling(size: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT[source]
Return type:

WindowWrapperT

abstract as_ansitable(**kwargs: Any) str[source]
Return type:

str

class faust.tables.GlobalTable(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, **kwargs: Any)[source]

Warning

Using a GlobalTable with multiple app instances may cause an app to be stuck in an infinite recovery loop. The current fix for this is to run the table with the following options:

app.GlobalTable(..., partitions=1, recovery_buffer_size=1)
logger: logging.Logger = <Logger faust.tables.globaltable (WARNING)>
log: CompositeLogger
app: _AppT
name: str
default: Any
schema: Optional[_SchemaT]
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
partitions: Optional[int]
window: Optional[WindowT]
help: str
recovery_buffer_size: int
standby_buffer_size: int
options: Optional[Mapping[str, Any]]
last_closed_window: float
use_partitioner: bool
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class faust.tables.GlobalTableT(app: _AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, **kwargs: Any)[source]
app: _AppT
name: str
default: Any
schema: Optional[_SchemaT]
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
partitions: Optional[int]
window: Optional[WindowT]
help: str
recovery_buffer_size: int
standby_buffer_size: int
options: Optional[Mapping[str, Any]]
last_closed_window: float
use_partitioner: bool
Diag: Type[DiagT]
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
shutdown_timeout: float
class faust.tables.TableManager(app: AppT, **kwargs: Any)[source]

Manage tables used by Faust worker.

persist_offset_on_commit(store: StoreT, tp: TP, offset: int) None[source]

Mark the persisted offset for a TP to be saved on commit.

This is used for “exactly_once” processing guarantee. Instead of writing the persisted offset to RocksDB when the message is sent, we write it to disk when the offset is committed.

Return type:

None

on_commit(offsets: MutableMapping[TP, int]) None[source]

Call when committing source topic partitions.

Return type:

None

on_commit_tp(tp: TP) None[source]

Call when committing source topic partition used by this table.

Return type:

None

on_rebalance_start() None[source]

Call when a new rebalancing operation starts.

Return type:

None

on_actives_ready() None[source]

Call when actives are fully up-to-date.

Return type:

None

on_standbys_ready() None[source]

Call when standbys are fully up-to-date and ready for failover.

Return type:

None

property changelog_topics: Set[str]

Return the set of known changelog topics. :rtype: _GenericAlias[str]

property changelog_queue: ThrowableQueue

Queue used to buffer changelog events. :rtype: ThrowableQueue

property recovery: Recovery

Recovery service used by this table manager. :rtype: Recovery

add(table: CollectionT) CollectionT[source]

Add table to be managed by this table manager.

Return type:

CollectionT

logger: logging.Logger = <Logger faust.tables.manager (WARNING)>
async on_start() None[source]

Call when table manager is starting.

Return type:

None

async wait_until_tables_registered() None[source]
Return type:

None

async on_stop() None[source]

Call when table manager is stopping.

Return type:

None

on_partitions_revoked(revoked: Set[TP]) None[source]

Call when cluster is rebalancing and partitions revoked.

Return type:

None

async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]

Call when the cluster is rebalancing.

Return type:

None

async wait_until_recovery_completed() bool[source]
Return type:

bool

class faust.tables.TableManagerT(app: _AppT, **kwargs: Any)[source]
app: _AppT
actives_ready: bool
standbys_ready: bool
abstract add(table: CollectionT) CollectionT[source]
Return type:

CollectionT

abstract persist_offset_on_commit(store: StoreT, tp: TP, offset: int) None[source]
Return type:

None

abstract on_commit(offsets: MutableMapping[TP, int]) None[source]
Return type:

None

abstract async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]
Return type:

None

abstract on_partitions_revoked(revoked: Set[TP]) None[source]
Return type:

None

abstract on_rebalance_start() None[source]
Return type:

None

abstract async wait_until_tables_registered() None[source]
Return type:

None

abstract async wait_until_recovery_completed() bool[source]
Return type:

bool

abstract property changelog_topics: Set[str]
Return type:

_GenericAlias[str]

class faust.tables.Table(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, **kwargs: Any)[source]

Table (non-windowed).

class WindowWrapper(table: TableT, *, relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]] = None, key_index: bool = False, key_index_table: Optional[TableT] = None)

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

as_ansitable(title: str = '{table.name}', **kwargs: Any) str

Draw table as a terminal ANSI table.

Return type:

str

clone(relative_to: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT

Clone this table using a new time-relativity configuration.

Return type:

WindowWrapperT

property get_relative_timestamp: Optional[Callable[[Optional[EventT]], Union[float, datetime]]]

Return the current handler for extracting event timestamp. :rtype: _GenericAlias[_GenericAlias[_GenericAlias[EventT, None], _GenericAlias[float, datetime]], None]

get_timestamp(event: Optional[EventT] = None) float

Get timestamp from event.

Return type:

float

items(event: Optional[EventT] = None) ItemsView

Return table items view: iterate over (key, value) pairs.

Return type:

_GenericAlias[~KT, +VT_co]

key_index: bool = False
key_index_table: Optional[TableT] = None
keys() KeysView

Return table keys view: iterate over keys found in this table.

Return type:

_GenericAlias[~KT]

property name: str

Return the name of this table. :rtype: str

on_del_key(key: Any) None

Call when a key is deleted from this table.

Return type:

None

on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]]

Call after table recovery.

Return type:

_GenericAlias[_GenericAlias[None]]

on_set_key(key: Any, value: Any) None

Call when the value for a key in this table is set.

Return type:

None

relative_to(ts: Optional[Union[_FieldDescriptorT, Callable[[Optional[EventT]], Union[float, datetime]], datetime, float]]) WindowWrapperT

Configure the time-relativity of this windowed table.

Return type:

WindowWrapperT

relative_to_field(field: FieldDescriptorT) WindowWrapperT

Configure table to be time-relative to a field in the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Further it will not use the timestamp of the Kafka message, but a field in the value of the event.

For example a model field:

class Account(faust.Record):
    created: float

table = app.Table('foo').hopping(
    ...,
).relative_to_field(Account.created)
Return type:

WindowWrapperT

relative_to_now() WindowWrapperT

Configure table to be time-relative to the system clock.

Return type:

WindowWrapperT

relative_to_stream() WindowWrapperT

Configure table to be time-relative to the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Return type:

WindowWrapperT

values(event: Optional[EventT] = None) ValuesView

Return table values view: iterate over values in this table.

Return type:

_GenericAlias[+VT_co]

table: TableT
using_window(window: WindowT, *, key_index: bool = False) WindowWrapperT[source]

Wrap table using a specific window type.

Return type:

WindowWrapperT

hopping(size: Union[timedelta, float, str], step: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT[source]

Wrap table in a hopping window.

Return type:

WindowWrapperT

tumbling(size: Union[timedelta, float, str], expires: Optional[Union[timedelta, float, str]] = None, key_index: bool = False) WindowWrapperT[source]

Wrap table in a tumbling window.

Return type:

WindowWrapperT

on_key_get(key: KT) None[source]

Call when the value for a key in this table is retrieved.

Return type:

None

on_key_set(key: KT, value: VT) None[source]

Call when the value for a key in this table is set.

Return type:

None

on_key_del(key: KT) None[source]

Call when a key in this table is removed.

Return type:

None

as_ansitable(title: str = '{table.name}', **kwargs: Any) str[source]

Draw table as a a terminal ANSI table.

Return type:

str

logger: logging.Logger = <Logger faust.tables.table (WARNING)>
log: CompositeLogger
app: _AppT
name: str
default: Any
schema: Optional[_SchemaT]
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
partitions: Optional[int]
window: Optional[WindowT]
help: str
recovery_buffer_size: int
standby_buffer_size: int
options: Optional[Mapping[str, Any]]
last_closed_window: float
use_partitioner: bool
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class faust.tables.TableT(app: _AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[_SchemaT] = None, key_type: Optional[_ModelArg] = None, value_type: Optional[_ModelArg] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, **kwargs: Any)[source]
app: _AppT
name: str
default: Any
schema: Optional[_SchemaT]
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
partitions: Optional[int]
window: Optional[WindowT]
help: str
recovery_buffer_size: int
standby_buffer_size: int
options: Optional[Mapping[str, Any]]
last_closed_window: float
use_partitioner: bool
Diag: Type[DiagT]
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
shutdown_timeout: float