faust.sensors
¶
Sensors.
- class faust.sensors.Monitor(*, max_avg_history: ~typing.Optional[int] = None, max_commit_latency_history: ~typing.Optional[int] = None, max_send_latency_history: ~typing.Optional[int] = None, max_assignment_latency_history: ~typing.Optional[int] = None, messages_sent: int = 0, tables: ~typing.Optional[~typing.MutableMapping[str, ~faust.sensors.monitor.TableState]] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: ~typing.Optional[~typing.Counter[str]] = None, events_total: int = 0, events_by_stream: ~typing.Optional[~typing.Counter[~faust.types.streams.StreamT]] = None, events_by_task: ~typing.Optional[~typing.Counter[~_asyncio.Task]] = None, events_runtime: ~typing.Optional[~typing.Deque[float]] = None, commit_latency: ~typing.Optional[~typing.Deque[float]] = None, send_latency: ~typing.Optional[~typing.Deque[float]] = None, assignment_latency: ~typing.Optional[~typing.Deque[float]] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: ~typing.Optional[~typing.Counter[~faust.types.tuples.TP]] = None, rebalances: ~typing.Optional[int] = None, rebalance_return_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_end_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: ~typing.Callable[[], float] = <built-in function monotonic>, http_response_codes: ~typing.Optional[~typing.Counter[~http.HTTPStatus]] = None, http_response_latency: ~typing.Optional[~typing.Deque[float]] = None, http_response_latency_avg: float = 0.0, **kwargs: ~typing.Any)[source]¶
Default Faust Sensor.
This is the default sensor, recording statistics about events, etc.
- send_errors = 0¶
Number of produce operations that ended in error.
- assignments_completed = 0¶
Number of partition assignments completed.
- assignments_failed = 0¶
Number of partitions assignments that failed.
- rebalances = 0¶
Number of rebalances seen by this worker.
- tables: MutableMapping[str, TableState] = None¶
Mapping of tables
- http_response_codes: Counter[HTTPStatus] = None¶
Counter of returned HTTP status codes.
- tp_committed_offsets: MutableMapping[TP, int] = None¶
Last committed offsets by TopicPartition
- tp_read_offsets: MutableMapping[TP, int] = None¶
Last read offsets by TopicPartition
- tp_end_offsets: MutableMapping[TP, int] = None¶
Log end offsets by TopicPartition
- stream_lookup: MutableMapping[StreamT, str] = None¶
- task_lookup: MutableMapping[Optional[Task], str] = None¶
- secs_since(start_time: float) float [source]¶
Given timestamp start, return number of seconds since that time.
- Return type:
- ms_since(start_time: float) float [source]¶
Given timestamp start, return number of ms since that time.
- Return type:
- logger: logging.Logger = <Logger faust.sensors.monitor (WARNING)>¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- asdict() Mapping [source]¶
Return monitor state as dictionary.
- Return type:
_GenericAlias
[~KT, +VT_co]
- on_message_in(tp: TP, offset: int, message: Message) None [source]¶
Call before message is delegated to streams.
- Return type:
None
- on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict] [source]¶
Call when stream starts processing an event.
- Return type:
_GenericAlias
[_GenericAlias
[~KT, ~VT],None
]
- on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None [source]¶
Call when stream is done processing an event.
- Return type:
None
- on_topic_buffer_full(tp: TP) None [source]¶
Call when conductor topic buffer is full and has to wait.
- Return type:
None
- on_message_out(tp: TP, offset: int, message: Message) None [source]¶
Call when message is fully acknowledged and can be committed.
- Return type:
None
- on_table_get(table: CollectionT, key: Any) None [source]¶
Call when value in table is retrieved.
- Return type:
None
- on_table_set(table: CollectionT, key: Any, value: Any) None [source]¶
Call when new value for key in table is set.
- Return type:
None
- on_table_del(table: CollectionT, key: Any) None [source]¶
Call when key in a table is deleted.
- Return type:
None
- on_commit_initiated(consumer: ConsumerT) Any [source]¶
Consumer is about to commit topic offset.
- Return type:
- on_commit_completed(consumer: ConsumerT, state: Any) None [source]¶
Call when consumer commit offset operation completed.
- Return type:
None
- on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any [source]¶
Call when message added to producer buffer.
- Return type:
- on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None [source]¶
Call when producer finished sending message.
- Return type:
None
- on_send_error(producer: ProducerT, exc: BaseException, state: Any) None [source]¶
Call when producer was unable to publish message.
- Return type:
None
- on_tp_commit(tp_offsets: MutableMapping[TP, int]) None [source]¶
Call when offset in topic partition is committed.
- Return type:
None
- track_tp_end_offset(tp: TP, offset: int) None [source]¶
Track new topic partition end offset for monitoring lags.
- Return type:
None
- on_assignment_start(assignor: PartitionAssignorT) Dict [source]¶
Partition assignor is starting to assign partitions.
- Return type:
_GenericAlias
[~KT, ~VT]
- on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None [source]¶
Partition assignor did not complete assignor due to error.
- Return type:
None
- on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None [source]¶
Partition assignor completed assignment.
- Return type:
None
- on_rebalance_start(app: AppT) Dict [source]¶
Cluster rebalance in progress.
- Return type:
_GenericAlias
[~KT, ~VT]
- on_rebalance_return(app: AppT, state: Dict) None [source]¶
Consumer replied assignment is done to broker.
- Return type:
None
- on_rebalance_end(app: AppT, state: Dict) None [source]¶
Cluster rebalance fully completed (including recovery).
- Return type:
None
- on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict [source]¶
Web server started working on request.
- Return type:
_GenericAlias
[~KT, ~VT]
- class faust.sensors.Sensor(*, beacon: Optional[NodeT] = None, loop: Optional[AbstractEventLoop] = None)[source]¶
Base class for sensors.
This sensor does not do anything at all, but can be subclassed to create new monitors.
- on_message_in(tp: TP, offset: int, message: Message) None [source]¶
Message received by a consumer.
- Return type:
None
- on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict] [source]¶
Message sent to a stream as an event.
- Return type:
_GenericAlias
[_GenericAlias
[~KT, ~VT],None
]
- on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None [source]¶
Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.- Return type:
None
- on_message_out(tp: TP, offset: int, message: Message) None [source]¶
All streams finished processing message.
- Return type:
None
- on_topic_buffer_full(tp: TP) None [source]¶
Topic buffer full so conductor had to wait.
- Return type:
None
- on_table_get(table: CollectionT, key: Any) None [source]¶
Key retrieved from table.
- Return type:
None
- on_table_set(table: CollectionT, key: Any, value: Any) None [source]¶
Value set for key in table.
- Return type:
None
- on_table_del(table: CollectionT, key: Any) None [source]¶
Key deleted from table.
- Return type:
None
- on_commit_initiated(consumer: ConsumerT) Any [source]¶
Consumer is about to commit topic offset.
- Return type:
- on_commit_completed(consumer: ConsumerT, state: Any) None [source]¶
Consumer finished committing topic offset.
- Return type:
None
- on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any [source]¶
About to send a message.
- Return type:
- on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None [source]¶
Message successfully sent.
- Return type:
None
- on_send_error(producer: ProducerT, exc: BaseException, state: Any) None [source]¶
Error while sending message.
- Return type:
None
- on_assignment_start(assignor: PartitionAssignorT) Dict [source]¶
Partition assignor is starting to assign partitions.
- Return type:
_GenericAlias
[~KT, ~VT]
- on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None [source]¶
Partition assignor did not complete assignor due to error.
- Return type:
None
- on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None [source]¶
Partition assignor completed assignment.
- Return type:
None
- on_rebalance_start(app: AppT) Dict [source]¶
Cluster rebalance in progress.
- Return type:
_GenericAlias
[~KT, ~VT]
- on_rebalance_return(app: AppT, state: Dict) None [source]¶
Consumer replied assignment is done to broker.
- Return type:
None
- on_rebalance_end(app: AppT, state: Dict) None [source]¶
Cluster rebalance fully completed (including recovery).
- Return type:
None
- on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict [source]¶
Web server started working on request.
- Return type:
_GenericAlias
[~KT, ~VT]
- on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None [source]¶
Web server finished working on request.
- Return type:
None
- asdict() Mapping [source]¶
Convert sensor state to dictionary.
- Return type:
_GenericAlias
[~KT, +VT_co]
- logger: logging.Logger = <Logger faust.sensors.base (WARNING)>¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- class faust.sensors.SensorDelegate(app: AppT)[source]¶
A class that delegates sensor methods to a list of sensors.
- on_message_in(tp: TP, offset: int, message: Message) None [source]¶
Call before message is delegated to streams.
- Return type:
None
- on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict] [source]¶
Call when stream starts processing an event.
- Return type:
_GenericAlias
[_GenericAlias
[~KT, ~VT],None
]
- on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None [source]¶
Call when stream is done processing an event.
- Return type:
None
- on_topic_buffer_full(tp: TP) None [source]¶
Call when conductor topic buffer is full and has to wait.
- Return type:
None
- on_message_out(tp: TP, offset: int, message: Message) None [source]¶
Call when message is fully acknowledged and can be committed.
- Return type:
None
- on_table_get(table: CollectionT, key: Any) None [source]¶
Call when value in table is retrieved.
- Return type:
None
- on_table_set(table: CollectionT, key: Any, value: Any) None [source]¶
Call when new value for key in table is set.
- Return type:
None
- on_table_del(table: CollectionT, key: Any) None [source]¶
Call when key in a table is deleted.
- Return type:
None
- on_commit_initiated(consumer: ConsumerT) Any [source]¶
Call when consumer commit offset operation starts.
- Return type:
- on_commit_completed(consumer: ConsumerT, state: Any) None [source]¶
Call when consumer commit offset operation completed.
- Return type:
None
- on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any [source]¶
Call when message added to producer buffer.
- Return type:
- on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None [source]¶
Call when producer finished sending message.
- Return type:
None
- on_send_error(producer: ProducerT, exc: BaseException, state: Any) None [source]¶
Call when producer was unable to publish message.
- Return type:
None
- on_assignment_start(assignor: PartitionAssignorT) Dict [source]¶
Partition assignor is starting to assign partitions.
- Return type:
_GenericAlias
[~KT, ~VT]
- on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None [source]¶
Partition assignor did not complete assignor due to error.
- Return type:
None
- on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None [source]¶
Partition assignor completed assignment.
- Return type:
None
- on_rebalance_start(app: AppT) Dict [source]¶
Cluster rebalance in progress.
- Return type:
_GenericAlias
[~KT, ~VT]
- on_rebalance_return(app: AppT, state: Dict) None [source]¶
Consumer replied assignment is done to broker.
- Return type:
None
- on_rebalance_end(app: AppT, state: Dict) None [source]¶
Cluster rebalance fully completed (including recovery).
- Return type:
None
- on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict [source]¶
Web server started working on request.
- Return type:
_GenericAlias
[~KT, ~VT]
- class faust.sensors.TableState(table: CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0)[source]¶
Represents the current state of a table.
- table: CollectionT = None¶