faust.tables.base

Base class Collection for Table and future data structures.

class faust.tables.base.Collection(app: AppT, *, name: Optional[str] = None, default: Optional[Callable[[], Any]] = None, store: Optional[Union[str, URL]] = None, schema: Optional[SchemaT] = None, key_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, value_type: Optional[Union[Type[ModelT], Type[bytes], Type[str]]] = None, partitions: Optional[int] = None, window: Optional[WindowT] = None, changelog_topic: Optional[TopicT] = None, help: Optional[str] = None, on_recover: Optional[Callable[[], Awaitable[None]]] = None, on_changelog_event: Optional[Callable[[EventT], Awaitable[None]]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: Optional[int] = None, extra_topic_configs: Optional[Mapping[str, Any]] = None, recover_callbacks: Optional[Set[Callable[[], Awaitable[None]]]] = None, options: Optional[Mapping[str, Any]] = None, use_partitioner: bool = False, on_window_close: Optional[Callable[[Any, Any], Union[None, Awaitable[None]]]] = None, is_global: bool = False, **kwargs: Any)[source]

Base class for changelog-backed data structures stored in Kafka.

app: _AppT
name: str
default: Any
schema: Optional[_SchemaT]
key_type: Optional[_ModelArg]
value_type: Optional[_ModelArg]
partitions: Optional[int]
window: Optional[WindowT]
help: str
recovery_buffer_size: int
standby_buffer_size: int
use_partitioner: bool
last_closed_window: float
options: Optional[Mapping[str, Any]]
property data: StoreT

Underlying table storage.

async on_start() None[source]

Call when table starts.

Return type:

None

on_recover(fun: Callable[[], Awaitable[None]]) Callable[[], Awaitable[None]][source]

Add function as callback to be called on table recovery.

Return type:

_GenericAlias[_GenericAlias[None]]

info() Mapping[str, Any][source]

Return table attributes as dictionary.

Return type:

_GenericAlias[str, Any]

persisted_offset(tp: TP) Optional[int][source]

Return the last persisted offset for topic partition.

Return type:

_GenericAlias[int, None]

async need_active_standby_for(tp: TP) bool[source]

Return False if we have access to partition data.

Return type:

bool

reset_state() None[source]

Reset local state.

Return type:

None

send_changelog(partition: Optional[int], key: Any, value: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None) FutureMessage[source]

Send modification event to changelog topic.

Return type:

FutureMessage

partition_for_key(key: Any) Optional[int][source]

Return partition number for table key.

Always returns None when use_partitioner is enabled.

Return type:

_GenericAlias[int, None]

Returns:

specific partition or None if

the producer should select partition using its partitioner.

Return type:

Optional[int]

async on_window_close(key: Any, value: Any) None[source]
Return type:

None

join(*fields: FieldDescriptorT) StreamT[source]

Right join of this table and another stream/table.

Return type:

StreamT

left_join(*fields: FieldDescriptorT) StreamT[source]

Left join of this table and another stream/table.

Return type:

StreamT

inner_join(*fields: FieldDescriptorT) StreamT[source]

Inner join of this table and another stream/table.

Return type:

StreamT

outer_join(*fields: FieldDescriptorT) StreamT[source]

Outer join of this table and another stream/table.

Return type:

StreamT

clone(**kwargs: Any) Any[source]

Clone table instance.

Return type:

Any

combine(*nodes: JoinableT, **kwargs: Any) StreamT[source]

Combine tables and streams.

Return type:

StreamT

contribute_to_stream(active: StreamT) None[source]

Contribute table to stream join.

Return type:

None

async remove_from_stream(stream: StreamT) None[source]

Remove table from stream join after stream stopped.

Return type:

None

async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP], generation_id: int = 0) None[source]

Call when cluster is rebalancing.

Return type:

None

async on_recovery_completed(active_tps: Set[TP], standby_tps: Set[TP]) None[source]

Call when recovery has completed after rebalancing.

Return type:

None

async call_recover_callbacks() None[source]

Call any configured recovery callbacks after rebalancing.

Return type:

None

async on_changelog_event(event: EventT) None[source]

Call when a new changelog event is received.

Return type:

None

property label: str

Return human-readable label used to represent this table. :rtype: str

property shortlabel: str

Return short label used to represent this table in logs. :rtype: str

property changelog_topic: TopicT

Return the changelog topic used by this table. :rtype: TopicT

logger: logging.Logger = <Logger faust.tables.base (WARNING)>
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
property changelog_topic_name: str
Return type:

str

apply_changelog_batch(batch: Iterable[EventT]) None[source]

Apply batch of events from changelog topic local table storage.

Return type:

None