diff --git a/opentelemetry-etw-metrics/src/exporter/mod.rs b/opentelemetry-etw-metrics/src/exporter/mod.rs index 650ad9c7..a5adb445 100644 --- a/opentelemetry-etw-metrics/src/exporter/mod.rs +++ b/opentelemetry-etw-metrics/src/exporter/mod.rs @@ -2,17 +2,12 @@ use opentelemetry::{ global, metrics::{MetricsError, Result}, }; -use opentelemetry_proto::tonic::{ - collector::metrics::v1::ExportMetricsServiceRequest, - metrics::v1::{ - metric::Data as TonicMetricData, number_data_point::Value as TonicDataPointValue, - Metric as TonicMetric, ResourceMetrics as TonicMetrics, - ResourceMetrics as TonicResourceMetrics, ScopeMetrics as TonicScopeMetrics, - Sum as TonicSum, - }, -}; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_sdk::metrics::{ - data::{self, Metric, ResourceMetrics, Temporality}, + data::{ + self, ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, + ScopeMetrics, Temporality, + }, exporter::PushMetricsExporter, reader::{AggregationSelector, DefaultAggregationSelector, TemporalitySelector}, Aggregation, InstrumentKind, @@ -71,67 +66,267 @@ impl Debug for MetricsExporter { #[async_trait] impl PushMetricsExporter for MetricsExporter { async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> { - let proto_message: ExportMetricsServiceRequest = (&*metrics).into(); - // ExportMetricsServiceRequest -> ResourceMetrics -> ScopeMetrics -> Metric -> Aggregation -> Gauge - for scope_metric in &metrics.scope_metrics { for metric in &scope_metric.metrics { + let mut resource_metrics = Vec::new(); + let data = &metric.data.as_any(); if let Some(hist) = data.downcast_ref::>() { - println!("u64 Histogram"); + for data_point in &hist.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Histogram { + temporality: hist.temporality, + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } } else if let Some(hist) = data.downcast_ref::>() { - println!("f64 Histogram"); - } else if let Some(_hist) = data.downcast_ref::>() { - println!("Exponential Histogram"); - } else if let Some(_hist) = data.downcast_ref::>() { - println!("Exponential Histogram"); + for data_point in &hist.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Histogram { + temporality: hist.temporality, + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(hist) = data.downcast_ref::>() { + for data_point in &hist.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::ExponentialHistogram { + temporality: hist.temporality, + data_points: vec![ExponentialHistogramDataPoint { + attributes: data_point.attributes.clone(), + count: data_point.count, + start_time: data_point.start_time, + time: data_point.time, + min: data_point.min, + max: data_point.max, + sum: data_point.sum, + scale: data_point.scale, + zero_count: data_point.zero_count, + zero_threshold: data_point.zero_threshold, + positive_bucket: ExponentialBucket { + offset: data_point.positive_bucket.offset, + counts: data_point.positive_bucket.counts.clone(), + }, + negative_bucket: ExponentialBucket { + offset: data_point.negative_bucket.offset, + counts: data_point.negative_bucket.counts.clone(), + }, + exemplars: data_point.exemplars.clone(), + }], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(hist) = data.downcast_ref::>() { + for data_point in &hist.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::ExponentialHistogram { + temporality: hist.temporality, + data_points: vec![ExponentialHistogramDataPoint { + attributes: data_point.attributes.clone(), + count: data_point.count, + start_time: data_point.start_time, + time: data_point.time, + min: data_point.min, + max: data_point.max, + sum: data_point.sum, + scale: data_point.scale, + zero_count: data_point.zero_count, + zero_threshold: data_point.zero_threshold, + positive_bucket: ExponentialBucket { + offset: data_point.positive_bucket.offset, + counts: data_point.positive_bucket.counts.clone(), + }, + negative_bucket: ExponentialBucket { + offset: data_point.negative_bucket.offset, + counts: data_point.negative_bucket.counts.clone(), + }, + exemplars: data_point.exemplars.clone(), + }], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } } else if let Some(sum) = data.downcast_ref::>() { - println!("u64 Sum"); + for data_point in &sum.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Sum { + temporality: sum.temporality, + data_points: vec![data_point.clone()], + is_monotonic: sum.is_monotonic, + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } } else if let Some(sum) = data.downcast_ref::>() { - println!("i64 Sum"); + for data_point in &sum.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Sum { + temporality: sum.temporality, + data_points: vec![data_point.clone()], + is_monotonic: sum.is_monotonic, + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } } else if let Some(sum) = data.downcast_ref::>() { - println!("f64 Sum"); - - let tonic_sum: TonicSum = (&*sum).into(); - for data_point in tonic_sum.data_points { - let export_metric_service_request = ExportMetricsServiceRequest { - resource_metrics: vec![TonicMetrics { - resource: Some((&metrics.resource).into()), - scope_metrics: vec![TonicScopeMetrics { - scope: Some((scope_metric.scope.clone(), None).into()), - metrics: vec![TonicMetric { - name: metric.name.to_string(), - description: metric.description.to_string(), - unit: metric.unit.to_string(), - metadata: vec![], - data: Some(TonicMetricData::Sum(TonicSum { - aggregation_temporality: tonic_sum - .aggregation_temporality, - data_points: vec![data_point], - is_monotonic: tonic_sum.is_monotonic, - })), - }], - schema_url: scope_metric - .scope - .schema_url - .as_ref() - .map(ToString::to_string) - .unwrap_or_default(), + for data_point in &sum.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Sum { + temporality: sum.temporality, + data_points: vec![data_point.clone()], + is_monotonic: sum.is_monotonic, + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(gauge) = data.downcast_ref::>() { + for data_point in &gauge.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Gauge { + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else if let Some(gauge) = data.downcast_ref::>() { + for data_point in &gauge.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Gauge { + data_points: vec![data_point.clone()], + }), }], - schema_url: metrics - .resource - .schema_url() - .map(Into::into) - .unwrap_or_default(), }], }; + resource_metrics.push(resource_metric); + } + } else if let Some(gauge) = data.downcast_ref::>() { + for data_point in &gauge.data_points { + let resource_metric = ResourceMetrics { + resource: metrics.resource.clone(), + scope_metrics: vec![ScopeMetrics { + scope: scope_metric.scope.clone(), + metrics: vec![Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Box::new(data::Gauge { + data_points: vec![data_point.clone()], + }), + }], + }], + }; + resource_metrics.push(resource_metric); + } + } else { + global::handle_error(MetricsError::Other(format!( + "Unsupported aggregation type: {:?}", + data + ))); + } - let mut byte_array = Vec::new(); - export_metric_service_request - .encode(&mut byte_array) - .map_err(|err| MetricsError::Other(err.to_string()))?; + for resource_metric in resource_metrics { + let mut byte_array = Vec::new(); + let proto_message: ExportMetricsServiceRequest = (&resource_metric).into(); + proto_message + .encode(&mut byte_array) + .map_err(|err| MetricsError::Other(err.to_string()))?; + if (byte_array.len()) > etw::MAX_EVENT_SIZE { + global::handle_error(MetricsError::Other(format!( + "Exporting failed due to event size {} exceeding the maximum size of {} bytes", + byte_array.len(), + etw::MAX_EVENT_SIZE + ))); + } else { let result = etw::write(&byte_array); + // TODO: Better logging/internal metrics needed here for non-failure + // case Uncomment the line below to see the exported bytes until a + // better logging solution is implemented + // println!("Exported {} bytes to ETW", byte_array.len()); if result != 0 { global::handle_error(MetricsError::Other(format!( "Failed to write ETW event with error code: {}", @@ -139,42 +334,10 @@ impl PushMetricsExporter for MetricsExporter { ))); } } - } else if let Some(gauge) = data.downcast_ref::>() { - println!("u64 Gauge"); - } else if let Some(gauge) = data.downcast_ref::>() { - println!("i64 Gauge"); - } else if let Some(gauge) = data.downcast_ref::>() { - println!("f64 Gauge"); - } else { - println!("Unsupported data type"); } } } - // let mut byte_array = Vec::new(); - // proto_message - // .encode(&mut byte_array) - // .map_err(|err| MetricsError::Other(err.to_string()))?; - - // if (byte_array.len()) > etw::MAX_EVENT_SIZE { - // global::handle_error(MetricsError::Other(format!( - // "Exporting failed due to event size {} exceeding the maximum size of {} bytes", - // byte_array.len(), - // etw::MAX_EVENT_SIZE - // ))); - // } else { - // let result = etw::write(&byte_array); - // // TODO: Better logging/internal metrics needed here for non-failure - // // case Uncomment the line below to see the exported bytes until a - // // better logging solution is implemented - // // println!("Exported {} bytes to ETW", byte_array.len()); - // if result != 0 { - // global::handle_error(MetricsError::Other(format!( - // "Failed to write ETW event with error code: {}", - // result - // ))); - // } - // } Ok(()) } @@ -218,8 +381,15 @@ mod tests { .with_unit("test_unit") .init(); - for index in 0..etw::MAX_EVENT_SIZE { - c.add(1.0, [KeyValue::new("index", format!("{index}"))].as_ref()); + // Create a key that is 1/10th the size of the MAX_EVENT_SIZE + let key_size = etw::MAX_EVENT_SIZE / 10; + let large_key = "a".repeat(key_size); + + for index in 0..11 { + c.add( + 1.0, + [KeyValue::new(large_key.clone(), format!("{index}"))].as_ref(), + ); } } }