diff --git a/kafka_exporter.go b/kafka_exporter.go index 577fdfe5..636188e8 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -635,6 +635,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) + e.mu.Lock() currentPartitionOffset, currentPartitionOffsetError := e.client.GetOffset(topic, partition, sarama.OffsetNewest) if currentPartitionOffsetError != nil { klog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, currentPartitionOffsetError) @@ -656,6 +657,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) } + e.mu.Unlock() } ch <- prometheus.MustNewConstMetric( consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,