Skip to content

Commit

Permalink
Fix for issue 437
Browse files Browse the repository at this point in the history
  • Loading branch information
dsohaliya-ontic committed Dec 22, 2024
1 parent 477643d commit c4370ad
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,24 +635,27 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
e.mu.Lock()
if offset, ok := offset[topic][partition]; ok {
// If the topic is consumed by that consumer group, but no offset associated with the partition
// forcing lag to -1 to be able to alert on that
currentPartitionOffset, currentPartitionOffsetError := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if currentPartitionOffsetError != nil {
klog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, currentPartitionOffsetError)
} else {
var lag int64
if offsetFetchResponseBlock.Offset == -1 {
lag = -1
} else {
lag = offset - offsetFetchResponseBlock.Offset
if offset, ok := offset[topic][partition]; ok {
if currentPartitionOffset == -1 {
currentPartitionOffset = offset
}
}
lag = currentPartitionOffset - offsetFetchResponseBlock.Offset
lagSum += lag
}

ch <- prometheus.MustNewConstMetric(
consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
} else {
klog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition)
}
e.mu.Unlock()
}
}
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
Expand Down

0 comments on commit c4370ad

Please sign in to comment.