faust.sensors.monitor

Monitor - sensor tracking metrics.

class faust.sensors.monitor.TableState(table: CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0)[source]

Represents the current state of a table.

table: CollectionT = None
keys_retrieved: int = 0

Number of times a key has been retrieved from this table.

keys_updated: int = 0

Number of times a key has been created/changed in this table.

keys_deleted: int = 0

Number of times a key has been deleted from this table.

asdict() Mapping[source]

Return table state as dictionary.

Return type:

_GenericAlias[~KT, +VT_co]

class faust.sensors.monitor.Monitor(*, max_avg_history: ~typing.Optional[int] = None, max_commit_latency_history: ~typing.Optional[int] = None, max_send_latency_history: ~typing.Optional[int] = None, max_assignment_latency_history: ~typing.Optional[int] = None, messages_sent: int = 0, tables: ~typing.Optional[~typing.MutableMapping[str, ~faust.sensors.monitor.TableState]] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: ~typing.Optional[~typing.Counter[str]] = None, events_total: int = 0, events_by_stream: ~typing.Optional[~typing.Counter[~faust.types.streams.StreamT]] = None, events_by_task: ~typing.Optional[~typing.Counter[~_asyncio.Task]] = None, events_runtime: ~typing.Optional[~typing.Deque[float]] = None, commit_latency: ~typing.Optional[~typing.Deque[float]] = None, send_latency: ~typing.Optional[~typing.Deque[float]] = None, assignment_latency: ~typing.Optional[~typing.Deque[float]] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: ~typing.Optional[~typing.Counter[~faust.types.tuples.TP]] = None, rebalances: ~typing.Optional[int] = None, rebalance_return_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_end_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: ~typing.Callable[[], float] = <built-in function monotonic>, http_response_codes: ~typing.Optional[~typing.Counter[~http.HTTPStatus]] = None, http_response_latency: ~typing.Optional[~typing.Deque[float]] = None, http_response_latency_avg: float = 0.0, **kwargs: ~typing.Any)[source]

Default Faust Sensor.

This is the default sensor, recording statistics about events, etc.

send_errors = 0

Number of produce operations that ended in error.

assignments_completed = 0

Number of partition assignments completed.

assignments_failed = 0

Number of partitions assignments that failed.

max_avg_history: int = 100

Max number of total run time values to keep to build average.

max_commit_latency_history: int = 30

Max number of commit latency numbers to keep.

max_send_latency_history: int = 30

Max number of send latency numbers to keep.

max_assignment_latency_history: int = 30

Max number of assignment latency numbers to keep.

rebalances = 0

Number of rebalances seen by this worker.

tables: MutableMapping[str, TableState] = None

Mapping of tables

commit_latency: Deque[float] = None

Deque of commit latency values

send_latency: Deque[float] = None

Deque of send latency values

assignment_latency: Deque[float] = None

Deque of assignment latency values.

rebalance_return_latency: Deque[float] = None

Deque of previous n rebalance return latencies.

rebalance_end_latency: Deque[float] = None

Deque of previous n rebalance end latencies.

rebalance_return_avg: float = 0.0

Average rebalance return latency.

rebalance_end_avg: float = 0.0

Average rebalance end latency.

messages_active: int = 0

Number of messages currently being processed.

messages_received_total: int = 0

Number of messages processed in total.

messages_received_by_topic: Counter[str] = None

Count of messages received by topic

messages_sent: int = 0

Number of messages sent in total.

messages_sent_by_topic: Counter[str] = None

Number of messages sent by topic.

messages_s: int = 0

Number of messages being processed this second.

events_active: int = 0

Number of events currently being processed.

events_total: int = 0

Number of events processed in total.

events_by_task: Counter[str] = None

Count of events processed by task

events_by_stream: Counter[str] = None

Count of events processed by stream

events_s: int = 0

Number of events being processed this second.

events_runtime_avg: float = 0.0

Average event runtime over the last second.

events_runtime: Deque[float] = None

Deque of run times used for averages

topic_buffer_full: Counter[TP] = None

Counter of times a topics buffer was full

time: Callable[[], float]
http_response_codes: Counter[HTTPStatus] = None

Counter of returned HTTP status codes.

http_response_latency: Deque[float] = None

Deque of previous n HTTP request->response latencies.

http_response_latency_avg: float = 0.0

Average request->response latency.

metric_counts: Counter[str] = None

Arbitrary counts added by apps

tp_committed_offsets: MutableMapping[TP, int] = None

Last committed offsets by TopicPartition

tp_read_offsets: MutableMapping[TP, int] = None

Last read offsets by TopicPartition

tp_end_offsets: MutableMapping[TP, int] = None

Log end offsets by TopicPartition

stream_inbound_time: Dict[TP, float] = None
stream_lookup: MutableMapping[StreamT, str] = None
task_lookup: MutableMapping[Optional[Task], str] = None
secs_since(start_time: float) float[source]

Given timestamp start, return number of seconds since that time.

Return type:

float

ms_since(start_time: float) float[source]

Given timestamp start, return number of ms since that time.

Return type:

float

logger: logging.Logger = <Logger faust.sensors.monitor (WARNING)>
secs_to_ms(timestamp: float) float[source]

Convert seconds to milliseconds.

Return type:

float

log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
asdict() Mapping[source]

Return monitor state as dictionary.

Return type:

_GenericAlias[~KT, +VT_co]

on_message_in(tp: TP, offset: int, message: Message) None[source]

Call before message is delegated to streams.

Return type:

None

on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict][source]

Call when stream starts processing an event.

Return type:

_GenericAlias[_GenericAlias[~KT, ~VT], None]

on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None[source]

Call when stream is done processing an event.

Return type:

None

on_topic_buffer_full(tp: TP) None[source]

Call when conductor topic buffer is full and has to wait.

Return type:

None

on_message_out(tp: TP, offset: int, message: Message) None[source]

Call when message is fully acknowledged and can be committed.

Return type:

None

on_table_get(table: CollectionT, key: Any) None[source]

Call when value in table is retrieved.

Return type:

None

on_table_set(table: CollectionT, key: Any, value: Any) None[source]

Call when new value for key in table is set.

Return type:

None

on_table_del(table: CollectionT, key: Any) None[source]

Call when key in a table is deleted.

Return type:

None

on_commit_initiated(consumer: ConsumerT) Any[source]

Consumer is about to commit topic offset.

Return type:

Any

on_commit_completed(consumer: ConsumerT, state: Any) None[source]

Call when consumer commit offset operation completed.

Return type:

None

on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any[source]

Call when message added to producer buffer.

Return type:

Any

on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None[source]

Call when producer finished sending message.

Return type:

None

on_send_error(producer: ProducerT, exc: BaseException, state: Any) None[source]

Call when producer was unable to publish message.

Return type:

None

count(metric_name: str, count: int = 1) None[source]

Count metric by name.

Return type:

None

on_tp_commit(tp_offsets: MutableMapping[TP, int]) None[source]

Call when offset in topic partition is committed.

Return type:

None

track_tp_end_offset(tp: TP, offset: int) None[source]

Track new topic partition end offset for monitoring lags.

Return type:

None

on_assignment_start(assignor: PartitionAssignorT) Dict[source]

Partition assignor is starting to assign partitions.

Return type:

_GenericAlias[~KT, ~VT]

on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None[source]

Partition assignor did not complete assignor due to error.

Return type:

None

on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None[source]

Partition assignor completed assignment.

Return type:

None

on_rebalance_start(app: AppT) Dict[source]

Cluster rebalance in progress.

Return type:

_GenericAlias[~KT, ~VT]

on_rebalance_return(app: AppT, state: Dict) None[source]

Consumer replied assignment is done to broker.

Return type:

None

on_rebalance_end(app: AppT, state: Dict) None[source]

Cluster rebalance fully completed (including recovery).

Return type:

None

on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict[source]

Web server started working on request.

Return type:

_GenericAlias[~KT, ~VT]

on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None[source]

Web server finished working on request.

Return type:

None

on_threaded_producer_buffer_processed(app: AppT, size: int) None[source]
Return type:

None