Sensors - Monitors and Statistics

Basics

Sensors record information about events occurring in a Faust application as they happen.

There’s a default sensor called “the monitor” that record the runtime of messages and events as they go through the worker, the latency of publishing messages, the latency of committing Kafka offsets, and so on.

The web server uses this monitor to present graphs and statistics about your instance, and there’s also a versions of the monitor available that forwards statistics to StatsD, and Datadog.

You can define custom sensors to record the information that you care about, and enable them in the worker.

Monitor

The faust.Monitor is a built-in sensor that captures information like:

  • Average message processing time (when all agents have processed a message).

  • Average event processing time (from an event received by an agent to the event is acked.)

  • The total number of events processed every second.

  • The total number of events processed every second listed by topic.

  • The total number of events processed every second listed by agent.

  • The total number of records written to tables.

  • Duration of Kafka topic commit operations (latency).

  • Duration of producing messages (latency).

You can access the state of the monitor, while the worker is running, in app.monitor:

@app.agent(app.topic('topic'))
def mytask(events):
    async for event in events:
        # emit how many events are being processed every second.
        print(app.monitor.events_s)

Monitor API Reference

Class: Monitor

Monitor Attributes
class faust.Monitor[source]
messages_active

Number of messages currently being processed.

messages_received_total

Number of messages processed in total.

messages_received_by_topic

Count of messages received by topic

messages_s

Number of messages being processed this second.

events_active

Number of events currently being processed.

events_total

Number of events processed in total.

events_s

Number of events being processed this second.

events_by_stream

Count of events processed by stream

events_by_task

Count of events processed by task

events_runtime

Deque of run times used for averages

events_runtime_avg

Average event runtime over the last second.

tables

Mapping of tables

commit_latency

Deque of commit latency values

send_latency

Deque of send latency values

messages_sent

Number of messages sent in total.

send_errors

Number of produce operations that ended in error.

messages_sent_by_topic

Number of messages sent by topic.

topic_buffer_full

Counter of times a topics buffer was full

metric_counts

Arbitrary counts added by apps

tp_committed_offsets

Last committed offsets by TopicPartition

tp_read_offsets

Last read offsets by TopicPartition

tp_end_offsets

Log end offsets by TopicPartition

assignment_latency

Deque of assignment latency values.

assignments_completed

Number of partition assignments completed.

assignments_failed

Number of partitions assignments that failed.

rebalances

Number of rebalances seen by this worker.

rebalance_return_latency

Deque of previous n rebalance return latencies.

rebalance_end_latency

Deque of previous n rebalance end latencies.

rebalance_return_avg

Average rebalance return latency.

rebalance_end_avg

Average rebalance end latency.

http_response_codes

Counter of returned HTTP status codes.

http_response_latency

Deque of previous n HTTP request->response latencies.

http_response_latency_avg

Average request->response latency.

Configuration Attributes
class faust.Monitor[source]
max_avg_history: int = 100

Max number of total run time values to keep to build average.

max_commit_latency_history: int = 30

Max number of commit latency numbers to keep.

max_send_latency_history: int = 30

Max number of send latency numbers to keep.

Class: TableState

class faust.sensors.TableState
TableState.table: CollectionT = None
TableState.keys_retrieved: int = 0

Number of times a key has been retrieved from this table.

TableState.keys_updated: int = 0

Number of times a key has been created/changed in this table.

TableState.keys_deleted: int = 0

Number of times a key has been deleted from this table.

Sensor API Reference

This reference describes the sensor interface and is useful when you want to build custom sensors.

Methods

Message Callbacks

class faust.Sensor[source]
on_message_in(tp: TP, offset: int, message: Message) None[source]

Message received by a consumer.

Return type:

None

on_message_out(tp: TP, offset: int, message: Message) None[source]

All streams finished processing message.

Return type:

None

Event Callbacks

class faust.Sensor[source]
on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict][source]

Message sent to a stream as an event.

Return type:

_GenericAlias[_GenericAlias[~KT, ~VT], None]

on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None[source]

Event was acknowledged by stream.

Notes

Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through on_message_out() just before offsets are committed.

Return type:

None

Table Callbacks

class faust.Sensor[source]
on_table_get(table: CollectionT, key: Any) None[source]

Key retrieved from table.

Return type:

None

on_table_set(table: CollectionT, key: Any, value: Any) None[source]

Value set for key in table.

Return type:

None

on_table_del(table: CollectionT, key: Any) None[source]

Key deleted from table.

Return type:

None

Consumer Callbacks

class faust.Sensor[source]
on_commit_initiated(consumer: ConsumerT) Any[source]

Consumer is about to commit topic offset.

Return type:

Any

on_commit_completed(consumer: ConsumerT, state: Any) None[source]

Consumer finished committing topic offset.

Return type:

None

on_topic_buffer_full(tp: TP) None[source]

Topic buffer full so conductor had to wait.

Return type:

None

on_assignment_start(assignor: PartitionAssignorT) Dict[source]

Partition assignor is starting to assign partitions.

Return type:

_GenericAlias[~KT, ~VT]

on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None[source]

Partition assignor did not complete assignor due to error.

Return type:

None

on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None[source]

Partition assignor completed assignment.

Return type:

None

on_rebalance_start(app: AppT) Dict[source]

Cluster rebalance in progress.

Return type:

_GenericAlias[~KT, ~VT]

on_rebalance_return(app: AppT, state: Dict) None[source]

Consumer replied assignment is done to broker.

Return type:

None

on_rebalance_end(app: AppT, state: Dict) None[source]

Cluster rebalance fully completed (including recovery).

Return type:

None

Producer Callbacks

class faust.Sensor[source]
on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any[source]

About to send a message.

Return type:

Any

on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None[source]

Message successfully sent.

Return type:

None

on_send_error(producer: ProducerT, exc: BaseException, state: Any) None[source]

Error while sending message.

Return type:

None

Web Callbacks

class faust.Sensor[source]
on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict[source]

Web server started working on request.

Return type:

_GenericAlias[~KT, ~VT]

on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None[source]

Web server finished working on request.

Return type:

None