faust.sensors

Sensors.

class faust.sensors.Monitor(*, max_avg_history: ~typing.Optional[int] = None, max_commit_latency_history: ~typing.Optional[int] = None, max_send_latency_history: ~typing.Optional[int] = None, max_assignment_latency_history: ~typing.Optional[int] = None, messages_sent: int = 0, tables: ~typing.Optional[~typing.MutableMapping[str, ~faust.sensors.monitor.TableState]] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: ~typing.Optional[~typing.Counter[str]] = None, events_total: int = 0, events_by_stream: ~typing.Optional[~typing.Counter[~faust.types.streams.StreamT]] = None, events_by_task: ~typing.Optional[~typing.Counter[~_asyncio.Task]] = None, events_runtime: ~typing.Optional[~typing.Deque[float]] = None, commit_latency: ~typing.Optional[~typing.Deque[float]] = None, send_latency: ~typing.Optional[~typing.Deque[float]] = None, assignment_latency: ~typing.Optional[~typing.Deque[float]] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: ~typing.Optional[~typing.Counter[~faust.types.tuples.TP]] = None, rebalances: ~typing.Optional[int] = None, rebalance_return_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_end_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: ~typing.Callable[[], float] = <built-in function monotonic>, http_response_codes: ~typing.Optional[~typing.Counter[~http.HTTPStatus]] = None, http_response_latency: ~typing.Optional[~typing.Deque[float]] = None, http_response_latency_avg: float = 0.0, **kwargs: ~typing.Any)[source]

Default Faust Sensor.

This is the default sensor, recording statistics about events, etc.

send_errors = 0

Number of produce operations that ended in error.

assignments_completed = 0

Number of partition assignments completed.

assignments_failed = 0

Number of partitions assignments that failed.

max_avg_history: int = 100

Max number of total run time values to keep to build average.

max_commit_latency_history: int = 30

Max number of commit latency numbers to keep.

max_send_latency_history: int = 30

Max number of send latency numbers to keep.

max_assignment_latency_history: int = 30

Max number of assignment latency numbers to keep.

rebalances = 0

Number of rebalances seen by this worker.

tables: MutableMapping[str, TableState] = None

Mapping of tables

commit_latency: Deque[float] = None

Deque of commit latency values

send_latency: Deque[float] = None

Deque of send latency values

assignment_latency: Deque[float] = None

Deque of assignment latency values.

rebalance_return_latency: Deque[float] = None

Deque of previous n rebalance return latencies.

rebalance_end_latency: Deque[float] = None

Deque of previous n rebalance end latencies.

rebalance_return_avg: float = 0.0

Average rebalance return latency.

rebalance_end_avg: float = 0.0

Average rebalance end latency.

messages_active: int = 0

Number of messages currently being processed.

messages_received_total: int = 0

Number of messages processed in total.

messages_received_by_topic: Counter[str] = None

Count of messages received by topic

messages_sent: int = 0

Number of messages sent in total.

messages_sent_by_topic: Counter[str] = None

Number of messages sent by topic.

messages_s: int = 0

Number of messages being processed this second.

events_active: int = 0

Number of events currently being processed.

events_total: int = 0

Number of events processed in total.

events_by_task: Counter[str] = None

Count of events processed by task

events_by_stream: Counter[str] = None

Count of events processed by stream

events_s: int = 0

Number of events being processed this second.

events_runtime_avg: float = 0.0

Average event runtime over the last second.

events_runtime: Deque[float] = None

Deque of run times used for averages

topic_buffer_full: Counter[TP] = None

Counter of times a topics buffer was full

time: Callable[[], float]
http_response_codes: Counter[HTTPStatus] = None

Counter of returned HTTP status codes.

http_response_latency: Deque[float] = None

Deque of previous n HTTP request->response latencies.

http_response_latency_avg: float = 0.0

Average request->response latency.

metric_counts: Counter[str] = None

Arbitrary counts added by apps

tp_committed_offsets: MutableMapping[TP, int] = None

Last committed offsets by TopicPartition

tp_read_offsets: MutableMapping[TP, int] = None

Last read offsets by TopicPartition

tp_end_offsets: MutableMapping[TP, int] = None

Log end offsets by TopicPartition

stream_inbound_time: Dict[TP, float] = None
stream_lookup: MutableMapping[StreamT, str] = None
task_lookup: MutableMapping[Optional[Task], str] = None
secs_since(start_time: float) float[source]

Given timestamp start, return number of seconds since that time.

Return type:

float

ms_since(start_time: float) float[source]

Given timestamp start, return number of ms since that time.

Return type:

float

logger: logging.Logger = <Logger faust.sensors.monitor (WARNING)>
secs_to_ms(timestamp: float) float[source]

Convert seconds to milliseconds.

Return type:

float

log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
asdict() Mapping[source]

Return monitor state as dictionary.

Return type:

_GenericAlias[~KT, +VT_co]

on_message_in(tp: TP, offset: int, message: Message) None[source]

Call before message is delegated to streams.

Return type:

None

on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict][source]

Call when stream starts processing an event.

Return type:

_GenericAlias[_GenericAlias[~KT, ~VT], None]

on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None[source]

Call when stream is done processing an event.

Return type:

None

on_topic_buffer_full(tp: TP) None[source]

Call when conductor topic buffer is full and has to wait.

Return type:

None

on_message_out(tp: TP, offset: int, message: Message) None[source]

Call when message is fully acknowledged and can be committed.

Return type:

None

on_table_get(table: CollectionT, key: Any) None[source]

Call when value in table is retrieved.

Return type:

None

on_table_set(table: CollectionT, key: Any, value: Any) None[source]

Call when new value for key in table is set.

Return type:

None

on_table_del(table: CollectionT, key: Any) None[source]

Call when key in a table is deleted.

Return type:

None

on_commit_initiated(consumer: ConsumerT) Any[source]

Consumer is about to commit topic offset.

Return type:

Any

on_commit_completed(consumer: ConsumerT, state: Any) None[source]

Call when consumer commit offset operation completed.

Return type:

None

on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any[source]

Call when message added to producer buffer.

Return type:

Any

on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None[source]

Call when producer finished sending message.

Return type:

None

on_send_error(producer: ProducerT, exc: BaseException, state: Any) None[source]

Call when producer was unable to publish message.

Return type:

None

count(metric_name: str, count: int = 1) None[source]

Count metric by name.

Return type:

None

on_tp_commit(tp_offsets: MutableMapping[TP, int]) None[source]

Call when offset in topic partition is committed.

Return type:

None

track_tp_end_offset(tp: TP, offset: int) None[source]

Track new topic partition end offset for monitoring lags.

Return type:

None

on_assignment_start(assignor: PartitionAssignorT) Dict[source]

Partition assignor is starting to assign partitions.

Return type:

_GenericAlias[~KT, ~VT]

on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None[source]

Partition assignor did not complete assignor due to error.

Return type:

None

on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None[source]

Partition assignor completed assignment.

Return type:

None

on_rebalance_start(app: AppT) Dict[source]

Cluster rebalance in progress.

Return type:

_GenericAlias[~KT, ~VT]

on_rebalance_return(app: AppT, state: Dict) None[source]

Consumer replied assignment is done to broker.

Return type:

None

on_rebalance_end(app: AppT, state: Dict) None[source]

Cluster rebalance fully completed (including recovery).

Return type:

None

on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict[source]

Web server started working on request.

Return type:

_GenericAlias[~KT, ~VT]

on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None[source]

Web server finished working on request.

Return type:

None

on_threaded_producer_buffer_processed(app: AppT, size: int) None[source]
Return type:

None

class faust.sensors.Sensor(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]

Base class for sensors.

This sensor does not do anything at all, but can be subclassed to create new monitors.

on_message_in(tp: TP, offset: int, message: Message) None[source]

Message received by a consumer.

Return type:

None

on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict][source]

Message sent to a stream as an event.

Return type:

_GenericAlias[_GenericAlias[~KT, ~VT], None]

on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None[source]

Event was acknowledged by stream.

Notes

Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through on_message_out() just before offsets are committed.

Return type:

None

on_message_out(tp: TP, offset: int, message: Message) None[source]

All streams finished processing message.

Return type:

None

on_topic_buffer_full(tp: TP) None[source]

Topic buffer full so conductor had to wait.

Return type:

None

on_table_get(table: CollectionT, key: Any) None[source]

Key retrieved from table.

Return type:

None

on_table_set(table: CollectionT, key: Any, value: Any) None[source]

Value set for key in table.

Return type:

None

on_table_del(table: CollectionT, key: Any) None[source]

Key deleted from table.

Return type:

None

on_commit_initiated(consumer: ConsumerT) Any[source]

Consumer is about to commit topic offset.

Return type:

Any

on_commit_completed(consumer: ConsumerT, state: Any) None[source]

Consumer finished committing topic offset.

Return type:

None

on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any[source]

About to send a message.

Return type:

Any

on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None[source]

Message successfully sent.

Return type:

None

on_send_error(producer: ProducerT, exc: BaseException, state: Any) None[source]

Error while sending message.

Return type:

None

on_assignment_start(assignor: PartitionAssignorT) Dict[source]

Partition assignor is starting to assign partitions.

Return type:

_GenericAlias[~KT, ~VT]

on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None[source]

Partition assignor did not complete assignor due to error.

Return type:

None

on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None[source]

Partition assignor completed assignment.

Return type:

None

on_rebalance_start(app: AppT) Dict[source]

Cluster rebalance in progress.

Return type:

_GenericAlias[~KT, ~VT]

on_rebalance_return(app: AppT, state: Dict) None[source]

Consumer replied assignment is done to broker.

Return type:

None

on_rebalance_end(app: AppT, state: Dict) None[source]

Cluster rebalance fully completed (including recovery).

Return type:

None

on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict[source]

Web server started working on request.

Return type:

_GenericAlias[~KT, ~VT]

on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None[source]

Web server finished working on request.

Return type:

None

on_threaded_producer_buffer_processed(app: AppT, size: int) None[source]
Return type:

None

asdict() Mapping[source]

Convert sensor state to dictionary.

Return type:

_GenericAlias[~KT, +VT_co]

logger: logging.Logger = <Logger faust.sensors.base (WARNING)>
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
class faust.sensors.SensorDelegate(app: AppT)[source]

A class that delegates sensor methods to a list of sensors.

add(sensor: SensorT) None[source]

Add sensor.

Return type:

None

remove(sensor: SensorT) None[source]

Remove sensor.

Return type:

None

on_message_in(tp: TP, offset: int, message: Message) None[source]

Call before message is delegated to streams.

Return type:

None

on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict][source]

Call when stream starts processing an event.

Return type:

_GenericAlias[_GenericAlias[~KT, ~VT], None]

on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None[source]

Call when stream is done processing an event.

Return type:

None

on_topic_buffer_full(tp: TP) None[source]

Call when conductor topic buffer is full and has to wait.

Return type:

None

on_message_out(tp: TP, offset: int, message: Message) None[source]

Call when message is fully acknowledged and can be committed.

Return type:

None

on_table_get(table: CollectionT, key: Any) None[source]

Call when value in table is retrieved.

Return type:

None

on_table_set(table: CollectionT, key: Any, value: Any) None[source]

Call when new value for key in table is set.

Return type:

None

on_table_del(table: CollectionT, key: Any) None[source]

Call when key in a table is deleted.

Return type:

None

on_commit_initiated(consumer: ConsumerT) Any[source]

Call when consumer commit offset operation starts.

Return type:

Any

on_commit_completed(consumer: ConsumerT, state: Any) None[source]

Call when consumer commit offset operation completed.

Return type:

None

on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any[source]

Call when message added to producer buffer.

Return type:

Any

on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None[source]

Call when producer finished sending message.

Return type:

None

on_send_error(producer: ProducerT, exc: BaseException, state: Any) None[source]

Call when producer was unable to publish message.

Return type:

None

on_assignment_start(assignor: PartitionAssignorT) Dict[source]

Partition assignor is starting to assign partitions.

Return type:

_GenericAlias[~KT, ~VT]

on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None[source]

Partition assignor did not complete assignor due to error.

Return type:

None

on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None[source]

Partition assignor completed assignment.

Return type:

None

on_rebalance_start(app: AppT) Dict[source]

Cluster rebalance in progress.

Return type:

_GenericAlias[~KT, ~VT]

on_rebalance_return(app: AppT, state: Dict) None[source]

Consumer replied assignment is done to broker.

Return type:

None

on_rebalance_end(app: AppT, state: Dict) None[source]

Cluster rebalance fully completed (including recovery).

Return type:

None

on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict[source]

Web server started working on request.

Return type:

_GenericAlias[~KT, ~VT]

on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None[source]

Web server finished working on request.

Return type:

None

on_threaded_producer_buffer_processed(app: AppT, size: int) None[source]
Return type:

None

class faust.sensors.TableState(table: CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0)[source]

Represents the current state of a table.

table: CollectionT = None
keys_retrieved: int = 0

Number of times a key has been retrieved from this table.

keys_updated: int = 0

Number of times a key has been created/changed in this table.

keys_deleted: int = 0

Number of times a key has been deleted from this table.

asdict() Mapping[source]

Return table state as dictionary.

Return type:

_GenericAlias[~KT, +VT_co]