faust.sensors.monitor
¶
Monitor - sensor tracking metrics.
- class faust.sensors.monitor.TableState(table: CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0)[source]¶
Represents the current state of a table.
- table: CollectionT = None¶
- class faust.sensors.monitor.Monitor(*, max_avg_history: ~typing.Optional[int] = None, max_commit_latency_history: ~typing.Optional[int] = None, max_send_latency_history: ~typing.Optional[int] = None, max_assignment_latency_history: ~typing.Optional[int] = None, messages_sent: int = 0, tables: ~typing.Optional[~typing.MutableMapping[str, ~faust.sensors.monitor.TableState]] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: ~typing.Optional[~typing.Counter[str]] = None, events_total: int = 0, events_by_stream: ~typing.Optional[~typing.Counter[~faust.types.streams.StreamT]] = None, events_by_task: ~typing.Optional[~typing.Counter[~_asyncio.Task]] = None, events_runtime: ~typing.Optional[~typing.Deque[float]] = None, commit_latency: ~typing.Optional[~typing.Deque[float]] = None, send_latency: ~typing.Optional[~typing.Deque[float]] = None, assignment_latency: ~typing.Optional[~typing.Deque[float]] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: ~typing.Optional[~typing.Counter[~faust.types.tuples.TP]] = None, rebalances: ~typing.Optional[int] = None, rebalance_return_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_end_latency: ~typing.Optional[~typing.Deque[float]] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: ~typing.Callable[[], float] = <built-in function monotonic>, http_response_codes: ~typing.Optional[~typing.Counter[~http.HTTPStatus]] = None, http_response_latency: ~typing.Optional[~typing.Deque[float]] = None, http_response_latency_avg: float = 0.0, **kwargs: ~typing.Any)[source]¶
Default Faust Sensor.
This is the default sensor, recording statistics about events, etc.
- send_errors = 0¶
Number of produce operations that ended in error.
- assignments_completed = 0¶
Number of partition assignments completed.
- assignments_failed = 0¶
Number of partitions assignments that failed.
- rebalances = 0¶
Number of rebalances seen by this worker.
- tables: MutableMapping[str, TableState] = None¶
Mapping of tables
- http_response_codes: Counter[HTTPStatus] = None¶
Counter of returned HTTP status codes.
- tp_committed_offsets: MutableMapping[TP, int] = None¶
Last committed offsets by TopicPartition
- tp_read_offsets: MutableMapping[TP, int] = None¶
Last read offsets by TopicPartition
- tp_end_offsets: MutableMapping[TP, int] = None¶
Log end offsets by TopicPartition
- stream_lookup: MutableMapping[StreamT, str] = None¶
- task_lookup: MutableMapping[Optional[Task], str] = None¶
- secs_since(start_time: float) float [source]¶
Given timestamp start, return number of seconds since that time.
- Return type:
- ms_since(start_time: float) float [source]¶
Given timestamp start, return number of ms since that time.
- Return type:
- logger: logging.Logger = <Logger faust.sensors.monitor (WARNING)>¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- on_message_in(tp: TP, offset: int, message: Message) None [source]¶
Call before message is delegated to streams.
- Return type:
None
- on_stream_event_in(tp: TP, offset: int, stream: StreamT, event: EventT) Optional[Dict] [source]¶
Call when stream starts processing an event.
- Return type:
_UnionGenericAlias
[_SpecialGenericAlias
,None
]
- on_stream_event_out(tp: TP, offset: int, stream: StreamT, event: EventT, state: Optional[Dict] = None) None [source]¶
Call when stream is done processing an event.
- Return type:
None
- on_topic_buffer_full(tp: TP) None [source]¶
Call when conductor topic buffer is full and has to wait.
- Return type:
None
- on_message_out(tp: TP, offset: int, message: Message) None [source]¶
Call when message is fully acknowledged and can be committed.
- Return type:
None
- on_table_get(table: CollectionT, key: Any) None [source]¶
Call when value in table is retrieved.
- Return type:
None
- on_table_set(table: CollectionT, key: Any, value: Any) None [source]¶
Call when new value for key in table is set.
- Return type:
None
- on_table_del(table: CollectionT, key: Any) None [source]¶
Call when key in a table is deleted.
- Return type:
None
- on_commit_initiated(consumer: ConsumerT) Any [source]¶
Consumer is about to commit topic offset.
- Return type:
- on_commit_completed(consumer: ConsumerT, state: Any) None [source]¶
Call when consumer commit offset operation completed.
- Return type:
None
- on_send_initiated(producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) Any [source]¶
Call when message added to producer buffer.
- Return type:
- on_send_completed(producer: ProducerT, state: Any, metadata: RecordMetadata) None [source]¶
Call when producer finished sending message.
- Return type:
None
- on_send_error(producer: ProducerT, exc: BaseException, state: Any) None [source]¶
Call when producer was unable to publish message.
- Return type:
None
- on_tp_commit(tp_offsets: MutableMapping[TP, int]) None [source]¶
Call when offset in topic partition is committed.
- Return type:
None
- track_tp_end_offset(tp: TP, offset: int) None [source]¶
Track new topic partition end offset for monitoring lags.
- Return type:
None
- on_assignment_start(assignor: PartitionAssignorT) Dict [source]¶
Partition assignor is starting to assign partitions.
- Return type:
_SpecialGenericAlias
- on_assignment_error(assignor: PartitionAssignorT, state: Dict, exc: BaseException) None [source]¶
Partition assignor did not complete assignor due to error.
- Return type:
None
- on_assignment_completed(assignor: PartitionAssignorT, state: Dict) None [source]¶
Partition assignor completed assignment.
- Return type:
None
- on_rebalance_start(app: AppT) Dict [source]¶
Cluster rebalance in progress.
- Return type:
_SpecialGenericAlias
- on_rebalance_return(app: AppT, state: Dict) None [source]¶
Consumer replied assignment is done to broker.
- Return type:
None
- on_rebalance_end(app: AppT, state: Dict) None [source]¶
Cluster rebalance fully completed (including recovery).
- Return type:
None
- on_web_request_start(app: AppT, request: Request, *, view: Optional[View] = None) Dict [source]¶
Web server started working on request.
- Return type:
_SpecialGenericAlias