Skip to content

Commit

Permalink
Added the lock while reading partition offset
Browse files Browse the repository at this point in the history
  • Loading branch information
dsohaliya-ontic committed Dec 22, 2024
1 parent c4370ad commit 4e4fc25
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
e.mu.Lock()
currentPartitionOffset, currentPartitionOffsetError := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if currentPartitionOffsetError != nil {
klog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, currentPartitionOffsetError)
Expand All @@ -656,6 +657,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
}
e.mu.Unlock()
}
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
Expand Down

0 comments on commit 4e4fc25

Please sign in to comment.