Sensors - Monitors and Statistics¶
Basics¶
Sensors record information about events occurring in a Faust application as they happen.
There’s a default sensor called “the monitor” that record the runtime of messages and events as they go through the worker, the latency of publishing messages, the latency of committing Kafka offsets, and so on.
The web server uses this monitor to present graphs and statistics about your instance, and there’s also a versions of the monitor available that forwards statistics to StatsD, and Datadog.
You can define custom sensors to record the information that you care about, and enable them in the worker.
Monitor¶
The faust.Monitor
is a built-in sensor that captures information like:
Average message processing time (when all agents have processed a message).
Average event processing time (from an event received by an agent to the event is acked.)
The total number of events processed every second.
The total number of events processed every second listed by topic.
The total number of events processed every second listed by agent.
The total number of records written to tables.
Duration of Kafka topic commit operations (latency).
Duration of producing messages (latency).
You can access the state of the monitor, while the worker is running,
in app.monitor
:
@app.agent(app.topic('topic'))
def mytask(events):
async for event in events:
# emit how many events are being processed every second.
print(app.monitor.events_s)
Monitor API Reference¶
Class: Monitor
¶
Monitor Attributes¶
- class faust.Monitor[source]
- messages_active
Number of messages currently being processed.
- messages_received_total
Number of messages processed in total.
- messages_received_by_topic
Count of messages received by topic
- messages_s
Number of messages being processed this second.
- events_active
Number of events currently being processed.
- events_total
Number of events processed in total.
- events_s
Number of events being processed this second.
- events_by_stream
Count of events processed by stream
- events_by_task
Count of events processed by task
- events_runtime
Deque of run times used for averages
- events_runtime_avg
Average event runtime over the last second.
- tables
Mapping of tables
- commit_latency
Deque of commit latency values
- send_latency
Deque of send latency values
- messages_sent
Number of messages sent in total.
- send_errors
Number of produce operations that ended in error.
- messages_sent_by_topic
Number of messages sent by topic.
- topic_buffer_full
Counter of times a topics buffer was full
- metric_counts
Arbitrary counts added by apps
- tp_committed_offsets
Last committed offsets by TopicPartition
- tp_read_offsets
Last read offsets by TopicPartition
- tp_end_offsets
Log end offsets by TopicPartition
- assignment_latency
Deque of assignment latency values.
- assignments_completed
Number of partition assignments completed.
- assignments_failed
Number of partitions assignments that failed.
- rebalances
Number of rebalances seen by this worker.
- rebalance_return_latency
Deque of previous n rebalance return latencies.
- rebalance_end_latency
Deque of previous n rebalance end latencies.
- rebalance_return_avg
Average rebalance return latency.
- rebalance_end_avg
Average rebalance end latency.
- http_response_codes
Counter of returned HTTP status codes.
- http_response_latency
Deque of previous n HTTP request->response latencies.
- http_response_latency_avg
Average request->response latency.
Configuration Attributes¶
- class faust.Monitor[source]
Class: TableState
¶
- class faust.sensors.TableState
- TableState.table: CollectionT = None
- TableState.keys_retrieved: int = 0
Number of times a key has been retrieved from this table.
- TableState.keys_updated: int = 0
Number of times a key has been created/changed in this table.
- TableState.keys_deleted: int = 0
Number of times a key has been deleted from this table.
Sensor API Reference¶
This reference describes the sensor interface and is useful when you want to build custom sensors.
Methods¶
Message Callbacks¶
- class faust.Sensor[source]
Event Callbacks¶
- class faust.Sensor[source]
- on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict] [source]
Message sent to a stream as an event.
- Return type:
_GenericAlias
[_GenericAlias
[~KT, ~VT],None
]
- on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None [source]
Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.- Return type:
None
Table Callbacks¶
- class faust.Sensor[source]
- on_table_get(table: CollectionT, key: Any) None [source]
Key retrieved from table.
- Return type:
None
- on_table_set(table: CollectionT, key: Any, value: Any) None [source]
Value set for key in table.
- Return type:
None
- on_table_del(table: CollectionT, key: Any) None [source]
Key deleted from table.
- Return type:
None
Consumer Callbacks¶
- class faust.Sensor[source]
- on_commit_initiated(consumer: ConsumerT) Any [source]
Consumer is about to commit topic offset.
- Return type:
- on_commit_completed(consumer: ConsumerT, state: Any) None [source]
Consumer finished committing topic offset.
- Return type:
None
- on_topic_buffer_full(tp: TP) None [source]
Topic buffer full so conductor had to wait.
- Return type:
None
- on_assignment_start(assignor: PartitionAssignorT) Dict [source]
Partition assignor is starting to assign partitions.
- Return type:
_GenericAlias
[~KT, ~VT]
- on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None [source]
Partition assignor did not complete assignor due to error.
- Return type:
None
- on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None [source]
Partition assignor completed assignment.
- Return type:
None
- on_rebalance_start(app: AppT) Dict [source]
Cluster rebalance in progress.
- Return type:
_GenericAlias
[~KT, ~VT]
Producer Callbacks¶
- class faust.Sensor[source]
- on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any [source]
About to send a message.
- Return type:
- on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None [source]
Message successfully sent.
- Return type:
None
- on_send_error(producer: ProducerT, exc: BaseException, state: Any) None [source]
Error while sending message.
- Return type:
None