faust.types.transports

faust.types.transports.ConsumerCallback

Callback called by faust.transport.base.Consumer whenever a message is received.

alias of Callable[[Message], Awaitable]

faust.types.transports.PartitionsRevokedCallback

Callback (async def) called when consumer partitions are revoked.

alias of Callable[[Set[TP]], Awaitable[None]]

faust.types.transports.PartitionsAssignedCallback

Callback (async def) called when consumer partitions are assigned.

alias of Callable[[Set[TP]], Awaitable[None]]

class faust.types.transports.ConsumerT(transport: TransportT, callback: Callable[[Message], Awaitable], on_partitions_revoked: Callable[[Set[TP]], Awaitable[None]], on_partitions_assigned: Callable[[Set[TP]], Awaitable[None]], *, commit_interval: Optional[float] = None, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]
transport: TransportT

The transport that created this Consumer.

transactions: TransactionManagerT
commit_interval: float

How often we commit topic offsets. See broker_commit_interval.

randomly_assigned_topics: Set[str]

Set of topic names that are considered “randomly assigned”. This means we don’t crash if it’s not part of our assignment. Used by e.g. the leader assignor service.

in_transaction: bool
scheduler: SchedulingStrategyT
abstract async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 1000.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None[source]
Return type:

None

abstract async subscribe(topics: Iterable[str]) None[source]
Return type:

None

abstract async getmany(timeout: float) AsyncIterator[Tuple[TP, Message]][source]
abstract track_message(message: Message) None[source]
Return type:

None

abstract async perform_seek() None[source]
Return type:

None

abstract ack(message: Message) bool[source]
Return type:

bool

abstract async wait_empty() None[source]
Return type:

None

abstract assignment() Set[TP][source]
Return type:

_GenericAlias[TP]

abstract highwater(tp: TP) int[source]
Return type:

int

abstract stop_flow() None[source]
Return type:

None

abstract resume_flow() None[source]
Return type:

None

abstract pause_partitions(tps: Iterable[TP]) None[source]
Return type:

None

abstract resume_partitions(tps: Iterable[TP]) None[source]
Return type:

None

abstract async position(tp: TP) Optional[int][source]
Return type:

_GenericAlias[int, None]

abstract async seek(partition: TP, offset: int) None[source]
Return type:

None

abstract async seek_wait(partitions: Mapping[TP, int]) None[source]
Return type:

None

abstract async commit(topics: Optional[AbstractSet[Union[str, TP]]] = None, start_new_transaction: bool = True) bool[source]
Return type:

bool

abstract async on_task_error(exc: BaseException) None[source]
Return type:

None

abstract async earliest_offsets(*partitions: TP) Mapping[TP, int][source]
Return type:

_GenericAlias[TP, int]

abstract async highwaters(*partitions: TP) Mapping[TP, int][source]
Return type:

_GenericAlias[TP, int]

abstract topic_partitions(topic: str) Optional[int][source]
Return type:

_GenericAlias[int, None]

abstract key_partition(topic: str, key: Optional[bytes], partition: Optional[int] = None) Optional[int][source]
Return type:

_GenericAlias[int, None]

abstract close() None[source]
Return type:

None

abstract verify_recovery_event_path(now: float, tp: TP) None[source]
Return type:

None

abstract property unacked: Set[Message]
Return type:

_GenericAlias[Message]

abstract on_buffer_full(tp: TP) None[source]
Return type:

None

abstract on_buffer_drop(tp: TP) None[source]
Return type:

None

class faust.types.transports.ProducerT(transport: TransportT, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]
transport: TransportT

The transport that created this Producer.

buffer: ProducerBufferT
client_id: str
linger_ms: int
max_batch_size: int
acks: int
max_request_size: int
compression_type: Optional[str]
ssl_context: Optional[SSLContext]
partitioner: Optional[Callable[[Optional[bytes], Sequence[int], Sequence[int]], int]]
request_timeout: float
abstract async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) Awaitable[RecordMetadata][source]
Return type:

_GenericAlias[RecordMetadata]

abstract send_soon(fut: FutureMessage) None[source]
Return type:

None

abstract async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[Union[List[Tuple[str, bytes]], Mapping[str, bytes]]], *, transactional_id: Optional[str] = None) RecordMetadata[source]
Return type:

RecordMetadata

abstract async create_topic(topic: str, partitions: int, replication: int, *, config: Optional[Mapping[str, Any]] = None, timeout: Union[timedelta, float, str] = 1000.0, retention: Optional[Union[timedelta, float, str]] = None, compacting: Optional[bool] = None, deleting: Optional[bool] = None, ensure_created: bool = False) None[source]
Return type:

None

abstract key_partition(topic: str, key: bytes) TP[source]
Return type:

TP

abstract async flush() None[source]
Return type:

None

abstract async begin_transaction(transactional_id: str) None[source]
Return type:

None

abstract async commit_transaction(transactional_id: str) None[source]
Return type:

None

abstract async abort_transaction(transactional_id: str) None[source]
Return type:

None

abstract async stop_transaction(transactional_id: str) None[source]
Return type:

None

abstract async maybe_begin_transaction(transactional_id: str) None[source]
Return type:

None

abstract async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None[source]
Return type:

None

abstract supports_headers() bool[source]
Return type:

bool

class faust.types.transports.ConductorT(app: _AppT, **kwargs: Any)[source]
app: _AppT
abstract async on_client_only_start() None[source]
Return type:

None

abstract acks_enabled_for(topic: str) bool[source]
Return type:

bool

abstract async commit(topics: AbstractSet[Union[str, TP]]) bool[source]
Return type:

bool

abstract async wait_for_subscriptions() None[source]
Return type:

None

abstract async maybe_wait_for_subscriptions() None[source]
Return type:

None

abstract async on_partitions_assigned(assigned: Set[TP]) None[source]
Return type:

None

abstract property acking_topics: Set[str]
Return type:

_GenericAlias[str]

class faust.types.transports.TransactionManagerT(transport: TransportT, loop: Optional[AbstractEventLoop] = None, *, consumer: ConsumerT, producer: ProducerT, **kwargs: Any)[source]
consumer: ConsumerT
producer: ProducerT
abstract async on_partitions_revoked(revoked: Set[TP]) None[source]
Return type:

None

abstract async on_rebalance(assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]) None[source]
Return type:

None

abstract async commit(offsets: Mapping[TP, int], start_new_transaction: bool = True) bool[source]
Return type:

bool

async begin_transaction(transactional_id: str) None[source]
Return type:

None

async commit_transaction(transactional_id: str) None[source]
Return type:

None

async abort_transaction(transactional_id: str) None[source]
Return type:

None

async stop_transaction(transactional_id: str) None[source]
Return type:

None

async maybe_begin_transaction(transactional_id: str) None[source]
Return type:

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) None[source]
Return type:

None

class faust.types.transports.TransportT(url: List[URL], app: _AppT, loop: Optional[AbstractEventLoop] = None)[source]
Consumer: ClassVar[Type[ConsumerT]]

The Consumer class used for this type of transport.

Producer: ClassVar[Type[ProducerT]]

The Producer class used for this type of transport.

TransactionManager: ClassVar[Type[TransactionManagerT]]

The TransactionManager class used for managing multiple transactions.

Conductor: ClassVar[Type[ConductorT]]

The Conductor class used to delegate messages from Consumer to streams.

Fetcher: ClassVar[Type[ServiceT]]

The Fetcher service used for this type of transport.

app: _AppT

The faust.App that created this transport.

url: List[URL]

//localhost).

Type:

The URL to use for this transport (e.g. kafka

driver_version: str

String identifying the underlying driver used for this transport. E.g. for aiokafka this could be aiokafka 0.4.1.

loop: AbstractEventLoop
abstract create_consumer(callback: Callable[[Message], Awaitable], **kwargs: Any) ConsumerT[source]
Return type:

ConsumerT

abstract create_producer(**kwargs: Any) ProducerT[source]
Return type:

ProducerT

abstract create_conductor(**kwargs: Any) ConductorT[source]
Return type:

ConductorT