diff --git a/prometheus-metrics-exporter-opentelemetry/pom.xml b/prometheus-metrics-exporter-opentelemetry/pom.xml index 4367ef43f..175c9a5dd 100644 --- a/prometheus-metrics-exporter-opentelemetry/pom.xml +++ b/prometheus-metrics-exporter-opentelemetry/pom.xml @@ -79,6 +79,30 @@ 4.13.2 test + + org.wiremock + wiremock + 3.2.0 + test + + + org.awaitility + awaitility + 4.2.0 + test + + + io.opentelemetry + opentelemetry-proto + 0.17.1 + test + + + io.opentelemetry + opentelemetry-sdk-trace + ${otel.version} + test + diff --git a/prometheus-metrics-exporter-opentelemetry/src/main/java/io/prometheus/metrics/exporter/opentelemetry/OpenTelemetryExporter.java b/prometheus-metrics-exporter-opentelemetry/src/main/java/io/prometheus/metrics/exporter/opentelemetry/OpenTelemetryExporter.java index 9c69faf3e..31f263fd9 100644 --- a/prometheus-metrics-exporter-opentelemetry/src/main/java/io/prometheus/metrics/exporter/opentelemetry/OpenTelemetryExporter.java +++ b/prometheus-metrics-exporter-opentelemetry/src/main/java/io/prometheus/metrics/exporter/opentelemetry/OpenTelemetryExporter.java @@ -18,7 +18,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -public class OpenTelemetryExporter { +public class OpenTelemetryExporter implements AutoCloseable { + private final PeriodicMetricReader reader; private OpenTelemetryExporter(Builder builder, PrometheusProperties config, PrometheusRegistry registry) { InstrumentationScopeInfo instrumentationScopeInfo = PrometheusInstrumentationScope.loadInstrumentationScopeInfo(); @@ -42,13 +43,18 @@ private OpenTelemetryExporter(Builder builder, PrometheusProperties config, Prom } exporter = exporterBuilder.build(); } - PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter) + reader = PeriodicMetricReader.builder(exporter) .setInterval(Duration.ofSeconds(ConfigHelper.getIntervalSeconds(builder, properties))) .build(); + PrometheusMetricProducer prometheusMetricProducer = new PrometheusMetricProducer(registry, instrumentationScopeInfo, resource); reader.register(prometheusMetricProducer); } + public void close() { + reader.shutdown(); + } + private Resource initResourceAttributes(Builder builder, ExporterOpenTelemetryProperties properties, InstrumentationScopeInfo instrumentationScopeInfo) { String serviceName = ConfigHelper.getServiceName(builder, properties); String serviceNamespace = ConfigHelper.getServiceNamespace(builder, properties); diff --git a/prometheus-metrics-exporter-opentelemetry/src/main/java/io/prometheus/metrics/exporter/opentelemetry/otelmodel/PrometheusData.java b/prometheus-metrics-exporter-opentelemetry/src/main/java/io/prometheus/metrics/exporter/opentelemetry/otelmodel/PrometheusData.java index 353a1623c..56d10847b 100644 --- a/prometheus-metrics-exporter-opentelemetry/src/main/java/io/prometheus/metrics/exporter/opentelemetry/otelmodel/PrometheusData.java +++ b/prometheus-metrics-exporter-opentelemetry/src/main/java/io/prometheus/metrics/exporter/opentelemetry/otelmodel/PrometheusData.java @@ -1,19 +1,22 @@ package io.prometheus.metrics.exporter.opentelemetry.otelmodel; -import io.prometheus.metrics.model.snapshots.DataPointSnapshot; -import io.prometheus.metrics.model.snapshots.Exemplars; +import io.prometheus.metrics.model.snapshots.*; import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.common.Attributes; import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.common.AttributesBuilder; +import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.trace.SpanContext; +import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.trace.TraceFlags; +import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.api.trace.TraceState; import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.data.Data; import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.data.DoubleExemplarData; import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.data.MetricDataType; import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.data.PointData; -import io.prometheus.metrics.model.snapshots.Exemplar; -import io.prometheus.metrics.model.snapshots.Labels; +import io.prometheus.metrics.shaded.io_opentelemetry_1_28_0.sdk.metrics.internal.data.ImmutableDoubleExemplarData; -import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; abstract class PrometheusData implements Data { @@ -44,8 +47,41 @@ protected List convertExemplar(Exemplar exemplar) { } protected List convertExemplars(Exemplars exemplars) { - // TODO: Exemplars not implemented yet. - return Collections.emptyList(); + return StreamSupport.stream(exemplars.spliterator(), false) + .map(this::toDoubleExemplarData) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + protected DoubleExemplarData toDoubleExemplarData(Exemplar exemplar) { + if (exemplar == null) { + return null; + } + + AttributesBuilder filteredAttributesBuilder = Attributes.builder(); + String traceId = null; + String spanId = null; + for (Label label : exemplar.getLabels()) { + if (label.getName().equals(Exemplar.TRACE_ID)) { + traceId = label.getValue(); + } + else if (label.getName().equals(Exemplar.SPAN_ID)) { + spanId = label.getValue(); + } else { + filteredAttributesBuilder.put(label.getName(), label.getValue()); + } + } + Attributes filteredAttributes = filteredAttributesBuilder.build(); + + SpanContext spanContext = (traceId != null && spanId != null) + ? SpanContext.create(traceId, spanId, TraceFlags.getSampled(), TraceState.getDefault()) + : SpanContext.getInvalid(); + + return ImmutableDoubleExemplarData.create( + filteredAttributes, + TimeUnit.MILLISECONDS.toNanos(exemplar.getTimestampMillis()), + spanContext, + exemplar.getValue()); } protected long getStartEpochNanos(DataPointSnapshot dataPoint) { @@ -55,4 +91,5 @@ protected long getStartEpochNanos(DataPointSnapshot dataPoint) { protected long getEpochNanos(DataPointSnapshot dataPoint, long currentTimeMillis) { return dataPoint.hasScrapeTimestamp() ? TimeUnit.MILLISECONDS.toNanos(dataPoint.getScrapeTimestampMillis()) : TimeUnit.MILLISECONDS.toNanos(currentTimeMillis); } + } diff --git a/prometheus-metrics-exporter-opentelemetry/src/test/java/io/prometheus/metrics/exporter/opentelemetry/ExemplarTest.java b/prometheus-metrics-exporter-opentelemetry/src/test/java/io/prometheus/metrics/exporter/opentelemetry/ExemplarTest.java new file mode 100644 index 000000000..0226c131c --- /dev/null +++ b/prometheus-metrics-exporter-opentelemetry/src/test/java/io/prometheus/metrics/exporter/opentelemetry/ExemplarTest.java @@ -0,0 +1,139 @@ +package io.prometheus.metrics.exporter.opentelemetry; + +import com.github.tomakehurst.wiremock.http.Request; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.matching.MatchResult; +import com.github.tomakehurst.wiremock.matching.ValueMatcher; +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.metrics.v1.DoubleDataPoint; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.prometheus.metrics.core.metrics.Counter; +import io.prometheus.metrics.model.registry.PrometheusRegistry; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; + +public class ExemplarTest { + private static final String ENDPOINT_PATH = "/v1/metrics"; + private static final int TIMEOUT = 3; + private static final String INSTRUMENTATION_SCOPE_NAME = "testInstrumentationScope"; + private static final String SPAN_NAME = "test-span"; + public static final String TEST_COUNTER_NAME = "test_counter"; + private Counter testCounter; + private OpenTelemetryExporter openTelemetryExporter; + @Rule + public WireMockRule wireMockRule = new WireMockRule(4317); + + @Before + public void setUp() { + openTelemetryExporter = OpenTelemetryExporter.builder() + .endpoint("http://localhost:4317") + .protocol("http/protobuf") + .intervalSeconds(1) + .buildAndStart(); + + testCounter = Counter.builder() + .name(TEST_COUNTER_NAME) + .withExemplars() + .register(); + + wireMockRule.stubFor(post(ENDPOINT_PATH) + .withHeader("Content-Type", containing("application/x-protobuf")) + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("{\"partialSuccess\":{}}"))); + } + + @After + public void tearDown() { + PrometheusRegistry.defaultRegistry.unregister(testCounter); + openTelemetryExporter.close(); + } + + @Test + public void sampledExemplarIsForwarded() { + try (SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .setSampler(Sampler.alwaysOn()) + .build()) { + + Tracer test = sdkTracerProvider.get(INSTRUMENTATION_SCOPE_NAME); + Span span = test.spanBuilder(SPAN_NAME) + .startSpan(); + try (Scope scope = span.makeCurrent()) { + testCounter.inc(2); + } + } + + + await().atMost(TIMEOUT, SECONDS) + .ignoreException(com.github.tomakehurst.wiremock.client.VerificationException.class) + .until(() -> { + verify(postRequestedFor(urlEqualTo(ENDPOINT_PATH)) + .withHeader("Content-Type", equalTo("application/x-protobuf")) + .andMatching(getExemplarCountMatcher(1))); + return true; + }); + + } + + @Test(expected = ConditionTimeoutException.class) + public void notSampledExemplarIsNotForwarded() { + try (SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .setSampler(Sampler.alwaysOff()) + .build()) { + + Tracer test = sdkTracerProvider.get(INSTRUMENTATION_SCOPE_NAME); + Span span = test.spanBuilder(SPAN_NAME) + .startSpan(); + try (Scope scope = span.makeCurrent()) { + testCounter.inc(2); + } + } + + await().atMost(TIMEOUT, SECONDS) + .ignoreException(com.github.tomakehurst.wiremock.client.VerificationException.class) + .until(() -> { + verify(postRequestedFor(urlEqualTo(ENDPOINT_PATH)) + .withHeader("Content-Type", equalTo("application/x-protobuf")) + .andMatching(getExemplarCountMatcher(1))); + return true; + }); + + } + + private static ValueMatcher getExemplarCountMatcher(int expectedCount) { + return request -> { + try { + ExportMetricsServiceRequest exportMetricsServiceRequest = ExportMetricsServiceRequest.parseFrom(request.getBody()); + for (ResourceMetrics resourceMetrics : exportMetricsServiceRequest.getResourceMetricsList()) { + for (InstrumentationLibraryMetrics instrumentationLibraryMetrics : resourceMetrics.getInstrumentationLibraryMetricsList()) { + for (Metric metric : instrumentationLibraryMetrics.getMetricsList()) { + for (DoubleDataPoint doubleDataPoint : metric.getDoubleSum().getDataPointsList()) { + if (doubleDataPoint.getExemplarsCount() == expectedCount) { + return MatchResult.exactMatch(); + } + } + } + } + } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return MatchResult.noMatch(); + }; + } +}