diff --git a/prometheus/collect_topic_partition_offsets.go b/prometheus/collect_topic_partition_offsets.go index 993ab72..3dd22a7 100644 --- a/prometheus/collect_topic_partition_offsets.go +++ b/prometheus/collect_topic_partition_offsets.go @@ -25,6 +25,15 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p return false } + // Highest Timestamp Offsets + // NB: this requires Kafka Brokers 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541) + // In older versions this is returning the timestamp of the low watermarks (earliest offset) + maxTimestampOffsets, err := e.minionSvc.ListOffsetsCached(ctx, -3) + if err != nil { + e.logger.Error("failed to fetch offsets for max timestamp", zap.Error(err)) + return false + } + // Process Low Watermarks for _, topic := range lowWaterMarks.Topics { if !e.minionSvc.IsTopicAllowed(topic.Topic) { @@ -101,5 +110,47 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p } } + // Process Max Timestamps + for _, topic := range maxTimestampOffsets.Topics { + if !e.minionSvc.IsTopicAllowed(topic.Topic) { + continue + } + topicMaxTimestamp := int64(0) + hasErrors := false + for _, partition := range topic.Partitions { + err := kerr.ErrorForCode(partition.ErrorCode) + if err != nil { + hasErrors = true + isOk = false + continue + } + if topicMaxTimestamp < partition.Timestamp { + topicMaxTimestamp = partition.Timestamp + } + // Let's end here if partition metrics shall not be exposed + if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic { + continue + } + if partition.Timestamp > 0 { + ch <- prometheus.MustNewConstMetric( + e.partitionMaxTimestamp, + prometheus.GaugeValue, + float64(partition.Timestamp), + topic.Topic, + strconv.Itoa(int(partition.Partition)), + ) + } + } + // We only want to report the max of all partition max timestamps if we receive results from all partitions + // and the topic is not empty + if !hasErrors && topicMaxTimestamp > 0 { + ch <- prometheus.MustNewConstMetric( + e.topicMaxTimestamp, + prometheus.GaugeValue, + float64(topicMaxTimestamp), + topic.Topic, + ) + } + } return isOk } diff --git a/prometheus/exporter.go b/prometheus/exporter.go index 3210b23..38f4de2 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -41,6 +41,8 @@ type Exporter struct { partitionHighWaterMark *prometheus.Desc topicLowWaterMarkSum *prometheus.Desc partitionLowWaterMark *prometheus.Desc + topicMaxTimestamp *prometheus.Desc + partitionMaxTimestamp *prometheus.Desc // Consumer Groups consumerGroupInfo *prometheus.Desc @@ -172,6 +174,20 @@ func (e *Exporter) InitializeMetrics() { []string{"topic_name"}, nil, ) + // Partition Max Timestamp + e.partitionMaxTimestamp = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_max_timestamp"), + "Partition Max Timestamp", + []string{"topic_name", "partition_id"}, + nil, + ) + // Topic Max Timestamp + e.topicMaxTimestamp = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_max_timestamp"), + "Topic Max Timestamp", + []string{"topic_name"}, + nil, + ) // Consumer Group Metrics // Group Info