faust.sensors.statsd

Monitor using Statsd.

class faust.sensors.statsd.StatsdMonitor(host: str = 'localhost', port: int = 8125, prefix: str = 'faust-app', rate: float = 1.0, **kwargs: Any)[source]

Statsd Faust Sensor.

This sensor, records statistics to Statsd along with computing metrics for the stats server

host: str
port: int
prefix: str
on_message_in(tp: TP, offset: int, message: Message) None[source]

Call before message is delegated to streams.

Return type:

None

on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict][source]

Call when stream starts processing an event.

Return type:

_GenericAlias[_GenericAlias[~KT, ~VT], None]

on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None[source]

Call when stream is done processing an event.

Return type:

None

on_message_out(tp: TP, offset: int, message: Message) None[source]

Call when message is fully acknowledged and can be committed.

Return type:

None

on_table_get(table: CollectionT, key: Any) None[source]

Call when value in table is retrieved.

Return type:

None

on_table_set(table: CollectionT, key: Any, value: Any) None[source]

Call when new value for key in table is set.

Return type:

None

on_table_del(table: CollectionT, key: Any) None[source]

Call when key in a table is deleted.

Return type:

None

on_commit_completed(consumer: ConsumerT, state: Any) None[source]

Call when consumer commit offset operation completed.

Return type:

None

on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any[source]

Call when message added to producer buffer.

Return type:

Any

on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None[source]

Call when producer finished sending message.

Return type:

None

on_send_error(producer: ProducerT, exc: BaseException, state: Any) None[source]

Call when producer was unable to publish message.

Return type:

None

on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None[source]

Partition assignor did not complete assignor due to error.

Return type:

None

on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None[source]

Partition assignor completed assignment.

Return type:

None

on_rebalance_start(app: AppT) Dict[source]

Cluster rebalance in progress.

Return type:

_GenericAlias[~KT, ~VT]

on_rebalance_return(app: AppT, state: Dict) None[source]

Consumer replied assignment is done to broker.

Return type:

None

on_rebalance_end(app: AppT, state: Dict) None[source]

Cluster rebalance fully completed (including recovery).

Return type:

None

count(metric_name: str, count: int = 1) None[source]

Count metric by name.

Return type:

None

logger: logging.Logger = <Logger faust.sensors.statsd (WARNING)>
on_tp_commit(tp_offsets: MutableMapping[TP, int]) None[source]

Call when offset in topic partition is committed.

Return type:

None

track_tp_end_offset(tp: TP, offset: int) None[source]

Track new topic partition end offset for monitoring lags.

Return type:

None

on_web_request_end(app: AppT, request: Request, response: Optional[Response], state: Dict, *, view: Optional[View] = None) None[source]

Web server finished working on request.

Return type:

None

client[source]

Return statsd client.