Skip to content

Commit

Permalink
#887 otlp exporter exemplars (#883)
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Eales <0x006ea1e5@gmail.com>
  • Loading branch information
0x006EA1E5 authored Nov 2, 2023
1 parent 348bf37 commit e9a0468
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 9 deletions.
24 changes: 24 additions & 0 deletions prometheus-metrics-exporter-opentelemetry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,30 @@
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
<artifactId>wiremock</artifactId>
<version>3.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>0.17.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>${otel.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OpenTelemetryExporter {
public class OpenTelemetryExporter implements AutoCloseable {
private final PeriodicMetricReader reader;

private OpenTelemetryExporter(Builder builder, PrometheusProperties config, PrometheusRegistry registry) {
InstrumentationScopeInfo instrumentationScopeInfo = PrometheusInstrumentationScope.loadInstrumentationScopeInfo();
Expand All @@ -42,13 +43,18 @@ private OpenTelemetryExporter(Builder builder, PrometheusProperties config, Prom
}
exporter = exporterBuilder.build();
}
PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
reader = PeriodicMetricReader.builder(exporter)
.setInterval(Duration.ofSeconds(ConfigHelper.getIntervalSeconds(builder, properties)))
.build();

PrometheusMetricProducer prometheusMetricProducer = new PrometheusMetricProducer(registry, instrumentationScopeInfo, resource);
reader.register(prometheusMetricProducer);
}

public void close() {
reader.shutdown();
}

private Resource initResourceAttributes(Builder builder, ExporterOpenTelemetryProperties properties, InstrumentationScopeInfo instrumentationScopeInfo) {
String serviceName = ConfigHelper.getServiceName(builder, properties);
String serviceNamespace = ConfigHelper.getServiceNamespace(builder, properties);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package io.prometheus.metrics.exporter.opentelemetry.otelmodel;

import io.prometheus.metrics.model.snapshots.DataPointSnapshot;
import io.prometheus.metrics.model.snapshots.Exemplars;
import io.prometheus.metrics.model.snapshots.*;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.common.Attributes;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.common.AttributesBuilder;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.trace.SpanContext;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.trace.TraceFlags;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.trace.TraceState;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.data.Data;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.data.DoubleExemplarData;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.data.MetricDataType;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.data.PointData;
import io.prometheus.metrics.model.snapshots.Exemplar;
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.internal.data.ImmutableDoubleExemplarData;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

abstract class PrometheusData<T extends PointData> implements Data<T> {

Expand Down Expand Up @@ -44,8 +47,41 @@ protected List<DoubleExemplarData> convertExemplar(Exemplar exemplar) {
}

protected List<DoubleExemplarData> convertExemplars(Exemplars exemplars) {
// TODO: Exemplars not implemented yet.
return Collections.emptyList();
return StreamSupport.stream(exemplars.spliterator(), false)
.map(this::toDoubleExemplarData)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

protected DoubleExemplarData toDoubleExemplarData(Exemplar exemplar) {
if (exemplar == null) {
return null;
}

AttributesBuilder filteredAttributesBuilder = Attributes.builder();
String traceId = null;
String spanId = null;
for (Label label : exemplar.getLabels()) {
if (label.getName().equals(Exemplar.TRACE_ID)) {
traceId = label.getValue();
}
else if (label.getName().equals(Exemplar.SPAN_ID)) {
spanId = label.getValue();
} else {
filteredAttributesBuilder.put(label.getName(), label.getValue());
}
}
Attributes filteredAttributes = filteredAttributesBuilder.build();

SpanContext spanContext = (traceId != null && spanId != null)
? SpanContext.create(traceId, spanId, TraceFlags.getSampled(), TraceState.getDefault())
: SpanContext.getInvalid();

return ImmutableDoubleExemplarData.create(
filteredAttributes,
TimeUnit.MILLISECONDS.toNanos(exemplar.getTimestampMillis()),
spanContext,
exemplar.getValue());
}

protected long getStartEpochNanos(DataPointSnapshot dataPoint) {
Expand All @@ -55,4 +91,5 @@ protected long getStartEpochNanos(DataPointSnapshot dataPoint) {
protected long getEpochNanos(DataPointSnapshot dataPoint, long currentTimeMillis) {
return dataPoint.hasScrapeTimestamp() ? TimeUnit.MILLISECONDS.toNanos(dataPoint.getScrapeTimestampMillis()) : TimeUnit.MILLISECONDS.toNanos(currentTimeMillis);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package io.prometheus.metrics.exporter.opentelemetry;

import com.github.tomakehurst.wiremock.http.Request;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.github.tomakehurst.wiremock.matching.MatchResult;
import com.github.tomakehurst.wiremock.matching.ValueMatcher;
import com.google.protobuf.InvalidProtocolBufferException;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.metrics.v1.DoubleDataPoint;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.prometheus.metrics.core.metrics.Counter;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

public class ExemplarTest {
private static final String ENDPOINT_PATH = "/v1/metrics";
private static final int TIMEOUT = 3;
private static final String INSTRUMENTATION_SCOPE_NAME = "testInstrumentationScope";
private static final String SPAN_NAME = "test-span";
public static final String TEST_COUNTER_NAME = "test_counter";
private Counter testCounter;
private OpenTelemetryExporter openTelemetryExporter;
@Rule
public WireMockRule wireMockRule = new WireMockRule(4317);

@Before
public void setUp() {
openTelemetryExporter = OpenTelemetryExporter.builder()
.endpoint("http://localhost:4317")
.protocol("http/protobuf")
.intervalSeconds(1)
.buildAndStart();

testCounter = Counter.builder()
.name(TEST_COUNTER_NAME)
.withExemplars()
.register();

wireMockRule.stubFor(post(ENDPOINT_PATH)
.withHeader("Content-Type", containing("application/x-protobuf"))
.willReturn(ok()
.withHeader("Content-Type", "application/json")
.withBody("{\"partialSuccess\":{}}")));
}

@After
public void tearDown() {
PrometheusRegistry.defaultRegistry.unregister(testCounter);
openTelemetryExporter.close();
}

@Test
public void sampledExemplarIsForwarded() {
try (SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.setSampler(Sampler.alwaysOn())
.build()) {

Tracer test = sdkTracerProvider.get(INSTRUMENTATION_SCOPE_NAME);
Span span = test.spanBuilder(SPAN_NAME)
.startSpan();
try (Scope scope = span.makeCurrent()) {
testCounter.inc(2);
}
}


await().atMost(TIMEOUT, SECONDS)
.ignoreException(com.github.tomakehurst.wiremock.client.VerificationException.class)
.until(() -> {
verify(postRequestedFor(urlEqualTo(ENDPOINT_PATH))
.withHeader("Content-Type", equalTo("application/x-protobuf"))
.andMatching(getExemplarCountMatcher(1)));
return true;
});

}

@Test(expected = ConditionTimeoutException.class)
public void notSampledExemplarIsNotForwarded() {
try (SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.setSampler(Sampler.alwaysOff())
.build()) {

Tracer test = sdkTracerProvider.get(INSTRUMENTATION_SCOPE_NAME);
Span span = test.spanBuilder(SPAN_NAME)
.startSpan();
try (Scope scope = span.makeCurrent()) {
testCounter.inc(2);
}
}

await().atMost(TIMEOUT, SECONDS)
.ignoreException(com.github.tomakehurst.wiremock.client.VerificationException.class)
.until(() -> {
verify(postRequestedFor(urlEqualTo(ENDPOINT_PATH))
.withHeader("Content-Type", equalTo("application/x-protobuf"))
.andMatching(getExemplarCountMatcher(1)));
return true;
});

}

private static ValueMatcher<Request> getExemplarCountMatcher(int expectedCount) {
return request -> {
try {
ExportMetricsServiceRequest exportMetricsServiceRequest = ExportMetricsServiceRequest.parseFrom(request.getBody());
for (ResourceMetrics resourceMetrics : exportMetricsServiceRequest.getResourceMetricsList()) {
for (InstrumentationLibraryMetrics instrumentationLibraryMetrics : resourceMetrics.getInstrumentationLibraryMetricsList()) {
for (Metric metric : instrumentationLibraryMetrics.getMetricsList()) {
for (DoubleDataPoint doubleDataPoint : metric.getDoubleSum().getDataPointsList()) {
if (doubleDataPoint.getExemplarsCount() == expectedCount) {
return MatchResult.exactMatch();
}
}
}
}
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
return MatchResult.noMatch();
};
}
}

0 comments on commit e9a0468

Please sign in to comment.