Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTLP exemplar support #5486

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.micrometer.registry.otlp;

import io.opentelemetry.proto.metrics.v1.Exemplar;

import java.util.List;
import java.util.concurrent.TimeUnit;

interface ExemplarCollector {
void offerMeasurement(double value);
void offerDurationMeasurement(long nanos);
List<Exemplar> collectAndReset();
List<Exemplar> collectDurationAndReset(TimeUnit baseTimeUnit);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.micrometer.registry.otlp;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;

class ExemplarCollectorFactory {
private final Clock clock;
private final SpanContextProvider spanContextProvider;

ExemplarCollectorFactory(Clock clock, SpanContextProvider spanContextProvider) {
this.clock = clock;
this.spanContextProvider = spanContextProvider;
}

ExemplarCollector fixedSize(int size) {
return new FixedSizeExemplarCollector(clock, spanContextProvider, new StaticCellSelector(0), size);
}

@Nullable
ExemplarCollector forHistogram(DistributionStatisticConfig distributionStatisticConfig, OtlpConfig otlpConfig) {
// This logic should match the logic from OtlpMeterRegistry#getHistogram(...)
if (distributionStatisticConfig.isPublishingHistogram()) {
if (HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM == OtlpMeterRegistry.histogramFlavor(otlpConfig.histogramFlavor(), distributionStatisticConfig)) {
return null;
}

double[] sloBuckets = OtlpMeterRegistry.getSloWithPositiveInf(distributionStatisticConfig);
return new FixedSizeExemplarCollector(clock, spanContextProvider, new HistogramCellSelector(sloBuckets), sloBuckets.length);
}

// Collecting exemplars for percentile histograms is not yet supported
return null;
}

private static class HistogramCellSelector implements FixedSizeExemplarCollector.CellSelector {
private final double[] boundaries;

private HistogramCellSelector(double[] boundaries) {
this.boundaries = boundaries;
}

@Override
public int cellIndexFor(double value) {
for (int i = 0; i < boundaries.length; ++i) {
if (value <= boundaries[i]) {
return i;
}
}
return -1;
}

@Override
public void reset() {
}
}

private static class StaticCellSelector implements FixedSizeExemplarCollector.CellSelector {
private final int index;

private StaticCellSelector(int index) {
this.index = index;
}

@Override
public int cellIndexFor(double value) {
return index;
}

@Override
public void reset() {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.micrometer.registry.otlp;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.Clock;
import io.opentelemetry.proto.metrics.v1.Exemplar;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

class FixedSizeExemplarCollector implements ExemplarCollector {
private final Clock clock;
private final SpanContextProvider spanContextProvider;
private final CellSelector cellSelector;
private final OtlpExemplar[] exemplars;
private volatile boolean hasMeasurements = false;

FixedSizeExemplarCollector(Clock clock, SpanContextProvider spanContextProvider, CellSelector cellSelector, int size) {
this.clock = clock;
this.spanContextProvider = spanContextProvider;
this.cellSelector = cellSelector;
this.exemplars = new OtlpExemplar[size];
for (int i = 0; i < size; i++) {
this.exemplars[i] = new OtlpExemplar();
}
}

@Override
public void offerMeasurement(double value) {
SpanContextProvider.SpanContext spanContext = spanContextProvider.gerCurrentSpan();
if (spanContext != null && spanContext.isSpanSampled()) {
int index = cellSelector.cellIndexFor(value);
if (index != -1) {
long timeUnixNano = TimeUnit.MILLISECONDS.toNanos(clock.wallTime());

this.exemplars[index].offerMeasurement(spanContext.getTraceId(), spanContext.getSpanId(), timeUnixNano, value);
this.hasMeasurements = true;
}
}
}

@Override
public void offerDurationMeasurement(long nanos) {
SpanContextProvider.SpanContext spanContext = spanContextProvider.gerCurrentSpan();
if (spanContext != null && spanContext.isSpanSampled()) {
int index = cellSelector.cellIndexFor(nanos);
if (index != -1) {
long timeUnixNano = TimeUnit.MILLISECONDS.toNanos(clock.wallTime());

this.exemplars[index].offerDurationMeasurement(spanContext.getTraceId(), spanContext.getSpanId(), timeUnixNano, nanos);
this.hasMeasurements = true;
}
}
}

@Override
public List<Exemplar> collectAndReset() {
return internalCollectAndReset(null);
}

@Override
public List<Exemplar> collectDurationAndReset(TimeUnit baseTimeUnit) {
return internalCollectAndReset(baseTimeUnit);
}

private List<Exemplar> internalCollectAndReset(@Nullable TimeUnit baseTimeUnit) {
if (!hasMeasurements) {
return Collections.emptyList();
}
List<Exemplar> result = new ArrayList<>();
for (OtlpExemplar otlpExemplar : exemplars) {
Exemplar exemplar = otlpExemplar.getAndReset(baseTimeUnit);
if (exemplar != null) {
result.add(exemplar);
}
}

this.cellSelector.reset();
this.hasMeasurements = false;

return Collections.unmodifiableList(result);
}

interface CellSelector {
int cellIndexFor(double value);

void reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,43 @@
*/
package io.micrometer.registry.otlp;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.cumulative.CumulativeCounter;
import io.opentelemetry.proto.metrics.v1.Exemplar;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

class OtlpCumulativeCounter extends CumulativeCounter implements StartTimeAwareMeter {
class OtlpCumulativeCounter extends CumulativeCounter implements StartTimeAwareMeter, OtlpExemplarMeter {

private final long startTimeNanos;

OtlpCumulativeCounter(Id id, Clock clock) {
@Nullable
private final ExemplarCollector exemplarCollector;

OtlpCumulativeCounter(Id id, Clock clock, @Nullable ExemplarCollectorFactory exemplarCollectorFactory) {
super(id);
this.startTimeNanos = TimeUnit.MILLISECONDS.toNanos(clock.wallTime());
this.exemplarCollector = exemplarCollectorFactory == null ? null : exemplarCollectorFactory.fixedSize(1);
}

@Override
public long getStartTimeNanos() {
return this.startTimeNanos;
}

@Override
public void increment(double amount) {
super.increment(amount);
if (exemplarCollector != null) {
exemplarCollector.offerMeasurement(amount);
}
}

@Override
public List<Exemplar> exemplars() {
return exemplarCollector == null ? Collections.emptyList() : exemplarCollector.collectAndReset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,44 @@
import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.cumulative.CumulativeDistributionSummary;
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.registry.otlp.internal.Base2ExponentialHistogram;
import io.micrometer.registry.otlp.internal.ExponentialHistogramSnapShot;
import io.opentelemetry.proto.metrics.v1.Exemplar;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

class OtlpCumulativeDistributionSummary extends CumulativeDistributionSummary
implements StartTimeAwareMeter, OtlpHistogramSupport {
implements StartTimeAwareMeter, OtlpHistogramSupport, OtlpExemplarMeter {

private final HistogramFlavor histogramFlavor;

private final long startTimeNanos;

@Nullable
private final ExemplarCollector exemplarCollector;

OtlpCumulativeDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, OtlpConfig otlpConfig) {
double scale, OtlpConfig otlpConfig, @Nullable ExemplarCollectorFactory exemplarCollectorFactory) {
super(id, clock, distributionStatisticConfig, scale,
OtlpMeterRegistry.getHistogram(clock, distributionStatisticConfig, otlpConfig));
this.startTimeNanos = TimeUnit.MILLISECONDS.toNanos(clock.wallTime());
this.histogramFlavor = OtlpMeterRegistry.histogramFlavor(otlpConfig.histogramFlavor(),
distributionStatisticConfig);

this.exemplarCollector = exemplarCollectorFactory == null
? null
: exemplarCollectorFactory.forHistogram(distributionStatisticConfig, otlpConfig);
}

@Override
protected void recordNonNegative(double amount) {
super.recordNonNegative(amount);
if (exemplarCollector != null) {
exemplarCollector.offerMeasurement(amount);
}
}

@Override
Expand All @@ -54,4 +72,8 @@ public ExponentialHistogramSnapShot getExponentialHistogramSnapShot() {
return null;
}

@Override
public List<Exemplar> exemplars() {
return exemplarCollector == null ? Collections.emptyList() : exemplarCollector.collectAndReset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,44 @@
import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.cumulative.CumulativeTimer;
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.registry.otlp.internal.Base2ExponentialHistogram;
import io.micrometer.registry.otlp.internal.ExponentialHistogramSnapShot;
import io.opentelemetry.proto.metrics.v1.Exemplar;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

class OtlpCumulativeTimer extends CumulativeTimer implements StartTimeAwareMeter, OtlpHistogramSupport {
class OtlpCumulativeTimer extends CumulativeTimer implements StartTimeAwareMeter, OtlpHistogramSupport, OtlpExemplarMeter {

private final HistogramFlavor histogramFlavor;

private final long startTimeNanos;

@Nullable
private final ExemplarCollector exemplarCollector;

OtlpCumulativeTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit, OtlpConfig otlpConfig) {
PauseDetector pauseDetector, TimeUnit baseTimeUnit, OtlpConfig otlpConfig,
@Nullable ExemplarCollectorFactory exemplarCollectorFactory) {
super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit,
OtlpMeterRegistry.getHistogram(clock, distributionStatisticConfig, otlpConfig, baseTimeUnit));
this.histogramFlavor = OtlpMeterRegistry.histogramFlavor(otlpConfig.histogramFlavor(),
distributionStatisticConfig);
this.startTimeNanos = TimeUnit.MILLISECONDS.toNanos(clock.wallTime());
this.exemplarCollector = exemplarCollectorFactory == null
? null
: exemplarCollectorFactory.forHistogram(distributionStatisticConfig, otlpConfig);
}

@Override
protected void recordNonNegative(long amount, TimeUnit unit) {
super.recordNonNegative(amount, unit);
if (exemplarCollector != null) {
exemplarCollector.offerDurationMeasurement(TimeUnit.NANOSECONDS.convert(amount, unit));
}
}

@Override
Expand All @@ -54,4 +72,8 @@ public ExponentialHistogramSnapShot getExponentialHistogramSnapShot() {
return null;
}

@Override
public List<Exemplar> exemplars() {
return exemplarCollector == null ? Collections.emptyList() : exemplarCollector.collectDurationAndReset(baseTimeUnit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.micrometer.registry.otlp;

import com.google.protobuf.ByteString;
import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.util.TimeUtils;
import io.opentelemetry.proto.metrics.v1.Exemplar;

import java.util.concurrent.TimeUnit;

class OtlpExemplar {
@Nullable
private String traceId = null;
@Nullable
private String spanId = null;
private long timeUnixNano = 0L;
private double doubleValue = 0.0;
private long durationInNano = 0L;

synchronized void offerMeasurement(String traceId, String spanId, long timeUnixNano, double value) {
this.traceId = traceId;
this.spanId = spanId;
this.timeUnixNano = timeUnixNano;
this.doubleValue = value;
}

synchronized void offerDurationMeasurement(String traceId, String spanId, long timeUnixNano, long durationInNano) {
this.traceId = traceId;
this.spanId = spanId;
this.timeUnixNano = timeUnixNano;
this.durationInNano = durationInNano;
}

@Nullable
synchronized Exemplar getAndReset(@Nullable TimeUnit baseTimeUnit) {
Exemplar exemplar = null;
if (traceId != null && spanId != null) {
exemplar = Exemplar.newBuilder()
.setTraceId(ByteString.fromHex(traceId))
.setSpanId(ByteString.fromHex(spanId))
.setTimeUnixNano(timeUnixNano)
.setAsDouble(baseTimeUnit == null ? doubleValue : TimeUtils.nanosToUnit(durationInNano, baseTimeUnit))
.build();
}

this.traceId = null;
this.spanId = null;
this.timeUnixNano = 0L;
this.doubleValue = 0.0;
this.durationInNano = 0L;

return exemplar;
}
}
Loading