faust.transport.consumer

Consumer - fetching messages and managing consumer state.

The Consumer is responsible for:

  • Holds reference to the transport that created it

  • … and the app via self.transport.app.

  • Has a callback that usually points back to Conductor.on_message.

  • Receives messages and calls the callback for every message received.

  • Keeps track of the message and its acked/unacked status.

  • The Conductor forwards the message to all Streams that subscribes to the topic the message was sent to.

    • Messages are reference counted, and the Conductor increases the reference count to the number of subscribed streams.

    • Stream.__aiter__ is set up in a way such that when what is iterating over the stream is finished with the message, a finally: block will decrease the reference count by one.

    • When the reference count for a message hits zero, the stream will call Consumer.ack(message), which will mark that topic + partition + offset combination as “committable”

    • If all the streams share the same key_type/value_type, the conductor will only deserialize the payload once.

  • Commits the offset at an interval

    • The Consumer has a background thread that periodically commits the offset.

    • If the consumer marked an offset as committable this thread will advance the committed offset.

    • To find the offset that it can safely advance to the commit thread will traverse the _acked mapping of TP to list of acked offsets, by finding a range of consecutive acked offsets (see note in _new_offset).

class faust.transport.consumer.Consumer(transport: TransportT, callback: Callable[[Message], Awaitable], on_partitions_revoked: Callable[[Set[TP]], Awaitable[None]], on_partitions_assigned: Callable[[Set[TP]], Awaitable[None]], *, commit_interval: Optional[float] = None, commit_livelock_soft_timeout: Optional[float] = None, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]

Base Consumer.

logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>
consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()

Tuple of exception types that may be raised when the underlying consumer driver is stopped.

flow_active: bool = True
transport: TransportT

The transport that created this Consumer.

app: AppT
in_transaction: bool
scheduler: SchedulingStrategyT
commit_interval: float

How often we commit topic offsets. See broker_commit_interval.

randomly_assigned_topics: Set[str]

Set of topic names that are considered “randomly assigned”. This means we don’t crash if it’s not part of our assignment. Used by e.g. the leader assignor service.

can_resume_flow: Event
suspend_flow: Event
not_waiting_next_records: Event
transactions: TransactionManagerT
on_init_dependencies() Iterable[ServiceT][source]

Return list of services this consumer depends on.

Return type:

_GenericAlias[ServiceT]

async on_restart() None[source]

Call when the consumer is restarted.

Return type:

None

on_buffer_full(tp: TP) None[source]
Return type:

None

on_buffer_drop(tp: TP) None[source]
Return type:

None

async perform_seek() None[source]

Seek all partitions to their current committed position.

Return type:

None

abstract async seek_to_committed() Mapping[TP, int][source]

Seek all partitions to their committed offsets.

Return type:

_GenericAlias[TP, int]

async seek(partition: TP, offset: int) None[source]

Seek partition to specific offset.

Return type:

None

stop_flow() None[source]

Block consumer from processing any more messages.

Return type:

None

resume_flow() None[source]

Allow consumer to process messages.

Return type:

None

async wait_for_stopped_flow() None[source]

Wait until the consumer is not waiting on any newly fetched records.

Useful for scenarios where the consumer needs to be stopped to change the position of the fetcher to something other than the committed offset. There is a chance that getmany forces a seek to the committed offsets if the fetcher returns while the consumer is stopped. This can be prevented by waiting for the fetcher to finish (by default every second).

Return type:

None

pause_partitions(tps: Iterable[TP]) None[source]

Pause fetching from partitions.

Return type:

None

resume_partitions(tps: Iterable[TP]) None[source]

Resume fetching from partitions.

Return type:

None

async on_partitions_revoked(revoked: Set[TP]) None[source]

Call during rebalancing when partitions are being revoked.

Return type:

None

async on_partitions_assigned(assigned: Set[TP], generation_id: int = 0) None[source]

Call during rebalancing when partitions are being assigned.

Return type:

None

async getmany(timeout: float) AsyncIterator[Tuple[TP, Message]][source]

Fetch batch of messages from server.

Return type:

_GenericAlias[_GenericAlias[TP, Message]]

track_message(message: Message) None[source]

Track message and mark it as pending ack.

Return type:

None

ack(message: Message) bool[source]

Mark message as being acknowledged by stream.

Return type:

bool

async wait_empty() None[source]

Wait for all messages that started processing to be acked.

Return type:

None

async commit_and_end_transactions() None[source]

Commit all safe offsets and end transaction.

Return type:

None

async on_stop() None[source]

Call when consumer is stopping.

Return type:

None

async verify_all_partitions_active() None[source]
Return type:

None

verify_event_path(now: float, tp: TP) None[source]
Return type:

None

verify_recovery_event_path(now: float, tp: TP) None[source]
Return type:

None

async commit(topics: Optional[AbstractSet[Union[str, TP]]] = None, start_new_transaction: bool = True) bool[source]

Maybe commit the offset for all or specific topics.

Parameters:

topics (_GenericAlias[_GenericAlias[_GenericAlias[str, TP]], None]) – Set containing topics and/or TopicPartitions to commit.

Return type:

bool

async maybe_wait_for_commit_to_finish() bool[source]

Wait for any existing commit operation to finish.

Return type:

bool

async force_commit(topics: AbstractSet[Union[str, TP]] = None, start_new_transaction: bool = True) bool[source]

Force offset commit.

Return type:

bool

async on_task_error(exc: BaseException) None[source]

Call when processing a message failed.

Return type:

None

close() None[source]

Close consumer for graceful shutdown.

Return type:

None

property unacked: Set[Message]

Return the set of currently unacknowledged messages. :rtype: _GenericAlias[Message]

log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class faust.transport.consumer.Fetcher(app: AppT, **kwargs: Any)[source]

Service fetching messages from Kafka.

logger: logging.Logger = <Logger faust.transport.consumer (WARNING)>
app: AppT
async on_stop() None[source]

Call when the fetcher is stopping.

Return type:

None

log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack