diff --git a/opentelemetry-datadog/Cargo.toml b/opentelemetry-datadog/Cargo.toml index 5f9aa32c..e2d6c435 100644 --- a/opentelemetry-datadog/Cargo.toml +++ b/opentelemetry-datadog/Cargo.toml @@ -19,10 +19,13 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [features] +default = ["intern-ahash"] agent-sampling = [] reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"] reqwest-client = ["reqwest", "opentelemetry-http/reqwest"] surf-client = ["dep:surf"] +intern-ahash = ["ahash"] +intern-std = [] [dependencies] indexmap = "2.0" @@ -39,6 +42,9 @@ thiserror = "1.0" itertools = "0.11" http = "0.2" futures-core = "0.3" +ryu = "1" +itoa = "1" +ahash = { version = "0.8", optional = true } [dev-dependencies] async-trait = "0.1" diff --git a/opentelemetry-datadog/src/exporter/intern.rs b/opentelemetry-datadog/src/exporter/intern.rs index 4a483c8d..d620f82b 100644 --- a/opentelemetry-datadog/src/exporter/intern.rs +++ b/opentelemetry-datadog/src/exporter/intern.rs @@ -1,34 +1,213 @@ use indexmap::set::IndexSet; +use opentelemetry::{StringValue, Value}; +use rmp::encode::{RmpWrite, ValueWriteError}; +use std::{ + cell::RefCell, + hash::{BuildHasherDefault, Hash}, +}; -pub(crate) struct StringInterner { - data: IndexSet, +#[cfg(feature = "intern-ahash")] +type InternHasher = ahash::AHasher; + +#[cfg(all(feature = "intern-std", not(feature = "intern-ahash")))] +type InternHasher = std::hash::DefaultHasher; + +#[derive(PartialEq)] +pub(crate) enum InternValue<'a> { + RegularString(&'a str), + OpenTelemetryValue(&'a Value), +} + +impl<'a> Hash for InternValue<'a> { + fn hash(&self, state: &mut H) { + match &self { + InternValue::RegularString(s) => s.hash(state), + InternValue::OpenTelemetryValue(v) => match v { + Value::Bool(x) => x.hash(state), + Value::I64(x) => x.hash(state), + Value::String(x) => x.hash(state), + Value::F64(x) => x.to_bits().hash(state), + Value::Array(a) => match a { + opentelemetry::Array::Bool(x) => x.hash(state), + opentelemetry::Array::I64(x) => x.hash(state), + opentelemetry::Array::F64(floats) => { + for f in floats { + f.to_bits().hash(state); + } + } + opentelemetry::Array::String(x) => x.hash(state), + }, + }, + } + } +} + +impl<'a> Eq for InternValue<'a> {} + +const BOOLEAN_TRUE: &str = "true"; +const BOOLEAN_FALSE: &str = "false"; +const LEFT_SQUARE_BRACKET: u8 = b'['; +const RIGHT_SQUARE_BRACKET: u8 = b']'; +const COMMA: u8 = b','; +const DOUBLE_QUOTE: u8 = b'"'; +const EMPTY_ARRAY: &str = "[]"; + +trait WriteAsLiteral { + fn write_to(&self, buffer: &mut Vec); +} + +impl WriteAsLiteral for bool { + fn write_to(&self, buffer: &mut Vec) { + buffer.extend_from_slice(if *self { BOOLEAN_TRUE } else { BOOLEAN_FALSE }.as_bytes()); + } +} + +impl WriteAsLiteral for i64 { + fn write_to(&self, buffer: &mut Vec) { + buffer.extend_from_slice(itoa::Buffer::new().format(*self).as_bytes()); + } +} + +impl WriteAsLiteral for f64 { + fn write_to(&self, buffer: &mut Vec) { + buffer.extend_from_slice(ryu::Buffer::new().format(*self).as_bytes()); + } +} + +impl WriteAsLiteral for StringValue { + fn write_to(&self, buffer: &mut Vec) { + buffer.push(DOUBLE_QUOTE); + buffer.extend_from_slice(self.as_str().as_bytes()); + buffer.push(DOUBLE_QUOTE); + } +} + +impl<'a> InternValue<'a> { + pub(crate) fn write_as_str( + &self, + payload: &mut W, + reusable_buffer: &mut Vec, + ) -> Result<(), ValueWriteError> { + match self { + InternValue::RegularString(x) => rmp::encode::write_str(payload, x), + InternValue::OpenTelemetryValue(v) => match v { + Value::Bool(x) => { + rmp::encode::write_str(payload, if *x { BOOLEAN_TRUE } else { BOOLEAN_FALSE }) + } + Value::I64(x) => rmp::encode::write_str(payload, itoa::Buffer::new().format(*x)), + Value::F64(x) => rmp::encode::write_str(payload, ryu::Buffer::new().format(*x)), + Value::String(x) => rmp::encode::write_str(payload, x.as_ref()), + Value::Array(array) => match array { + opentelemetry::Array::Bool(x) => { + Self::write_generic_array(payload, reusable_buffer, x) + } + opentelemetry::Array::I64(x) => { + Self::write_generic_array(payload, reusable_buffer, x) + } + opentelemetry::Array::F64(x) => { + Self::write_generic_array(payload, reusable_buffer, x) + } + opentelemetry::Array::String(x) => { + Self::write_generic_array(payload, reusable_buffer, x) + } + }, + }, + } + } + + fn write_empty_array(payload: &mut W) -> Result<(), ValueWriteError> { + rmp::encode::write_str(payload, EMPTY_ARRAY) + } + + fn write_buffer_as_string( + payload: &mut W, + reusable_buffer: &[u8], + ) -> Result<(), ValueWriteError> { + rmp::encode::write_str_len(payload, reusable_buffer.len() as u32)?; + payload + .write_bytes(reusable_buffer) + .map_err(ValueWriteError::InvalidDataWrite) + } + + fn write_generic_array( + payload: &mut W, + reusable_buffer: &mut Vec, + array: &[T], + ) -> Result<(), ValueWriteError> { + if array.is_empty() { + return Self::write_empty_array(payload); + } + + reusable_buffer.clear(); + reusable_buffer.push(LEFT_SQUARE_BRACKET); + + array[0].write_to(reusable_buffer); + + for value in array[1..].iter() { + reusable_buffer.push(COMMA); + value.write_to(reusable_buffer); + } + + reusable_buffer.push(RIGHT_SQUARE_BRACKET); + + Self::write_buffer_as_string(payload, reusable_buffer) + } +} + +pub(crate) struct StringInterner<'a> { + data: IndexSet, BuildHasherDefault>, } -impl StringInterner { - pub(crate) fn new() -> StringInterner { +impl<'a> StringInterner<'a> { + pub(crate) fn new() -> StringInterner<'a> { StringInterner { - data: Default::default(), + data: IndexSet::with_capacity_and_hasher(128, BuildHasherDefault::default()), } } - pub(crate) fn intern(&mut self, data: &str) -> u32 { - if let Some(idx) = self.data.get_index_of(data) { + pub(crate) fn intern(&mut self, data: &'a str) -> u32 { + if let Some(idx) = self.data.get_index_of(&InternValue::RegularString(data)) { return idx as u32; } - self.data.insert_full(data.to_string()).0 as u32 + self.data.insert_full(InternValue::RegularString(data)).0 as u32 } - pub(crate) fn iter(&self) -> impl Iterator { - self.data.iter() + pub(crate) fn intern_value(&mut self, data: &'a Value) -> u32 { + if let Some(idx) = self + .data + .get_index_of(&InternValue::OpenTelemetryValue(data)) + { + return idx as u32; + } + self.data + .insert_full(InternValue::OpenTelemetryValue(data)) + .0 as u32 } - pub(crate) fn len(&self) -> u32 { - self.data.len() as u32 + pub(crate) fn write_dictionary( + &self, + payload: &mut W, + ) -> Result<(), ValueWriteError> { + thread_local! { + static BUFFER: RefCell> = RefCell::new(Vec::with_capacity(4096)); + } + + BUFFER.with(|cell| { + let reusable_buffer = &mut cell.borrow_mut(); + rmp::encode::write_array_len(payload, self.data.len() as u32)?; + for data in self.data.iter() { + data.write_as_str(payload, reusable_buffer)?; + } + + Ok(()) + }) } } #[cfg(test)] mod tests { + use opentelemetry::Array; + use super::*; #[test] @@ -50,4 +229,291 @@ mod tests { assert_eq!(d_idx, a_idx); assert_eq!(e_idx, c_idx); } + + #[test] + fn test_intern_bool() { + let a = Value::Bool(true); + let b = Value::Bool(false); + let c = "c"; + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + } + + #[test] + fn test_intern_i64() { + let a = Value::I64(1234567890); + let b = Value::I64(-1234567890); + let c = "c"; + let d = Value::I64(1234567890); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, a_idx); + } + + #[test] + fn test_intern_f64() { + let a = Value::F64(123456.7890); + let b = Value::F64(-1234567.890); + let c = "c"; + let d = Value::F64(-1234567.890); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(b_idx, f_idx); + } + + #[test] + fn test_intern_array_of_booleans() { + let a = Value::Array(Array::Bool(vec![true, false])); + let b = Value::Array(Array::Bool(vec![false, true])); + let c = "c"; + let d = Value::Array(Array::Bool(vec![])); + let f = Value::Array(Array::Bool(vec![false, true])); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + let g_idx = intern.intern_value(&f); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, 3); + assert_eq!(g_idx, b_idx); + } + + #[test] + fn test_intern_array_of_i64() { + let a = Value::Array(Array::I64(vec![123, -123])); + let b = Value::Array(Array::I64(vec![-123, 123])); + let c = "c"; + let d = Value::Array(Array::I64(vec![])); + let f = Value::Array(Array::I64(vec![-123, 123])); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + let g_idx = intern.intern_value(&f); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, 3); + assert_eq!(g_idx, b_idx); + } + + #[test] + fn test_intern_array_of_f64() { + let f1 = 123.0f64; + let f2 = 0f64; + + let a = Value::Array(Array::F64(vec![f1, f2])); + let b = Value::Array(Array::F64(vec![f2, f1])); + let c = "c"; + let d = Value::Array(Array::F64(vec![])); + let f = Value::Array(Array::F64(vec![f2, f1])); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + let g_idx = intern.intern_value(&f); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, 3); + assert_eq!(g_idx, b_idx); + } + + #[test] + fn test_intern_array_of_string() { + let s1 = "a"; + let s2 = "b"; + + let a = Value::Array(Array::String(vec![ + StringValue::from(s1), + StringValue::from(s2), + ])); + let b = Value::Array(Array::String(vec![ + StringValue::from(s2), + StringValue::from(s1), + ])); + let c = "c"; + let d = Value::Array(Array::String(vec![])); + let f = Value::Array(Array::String(vec![ + StringValue::from(s2), + StringValue::from(s1), + ])); + + let mut intern = StringInterner::new(); + let a_idx = intern.intern_value(&a); + let b_idx = intern.intern_value(&b); + let c_idx = intern.intern(c); + let d_idx = intern.intern_value(&a); + let e_idx = intern.intern(c); + let f_idx = intern.intern_value(&d); + let g_idx = intern.intern_value(&f); + + assert_eq!(a_idx, 0); + assert_eq!(b_idx, 1); + assert_eq!(c_idx, 2); + assert_eq!(d_idx, a_idx); + assert_eq!(e_idx, c_idx); + assert_eq!(f_idx, 3); + assert_eq!(g_idx, b_idx); + } + + #[test] + fn test_write_boolean_literal() { + let mut buffer: Vec = vec![]; + + true.write_to(&mut buffer); + + assert_eq!(&buffer[..], b"true"); + + buffer.clear(); + + false.write_to(&mut buffer); + + assert_eq!(&buffer[..], b"false"); + } + + #[test] + fn test_write_i64_literal() { + let mut buffer: Vec = vec![]; + + 1234567890i64.write_to(&mut buffer); + + assert_eq!(&buffer[..], b"1234567890"); + + buffer.clear(); + + (-1234567890i64).write_to(&mut buffer); + + assert_eq!(&buffer[..], b"-1234567890"); + } + + #[test] + fn test_write_f64_literal() { + let mut buffer: Vec = vec![]; + + let f1 = 12345.678f64; + let f2 = -12345.678f64; + + f1.write_to(&mut buffer); + + assert_eq!(&buffer[..], format!("{}", f1).as_bytes()); + + buffer.clear(); + + f2.write_to(&mut buffer); + + assert_eq!(&buffer[..], format!("{}", f2).as_bytes()); + } + + #[test] + fn test_write_string_literal() { + let mut buffer: Vec = vec![]; + + let s1 = StringValue::from("abc"); + let s2 = StringValue::from(""); + + s1.write_to(&mut buffer); + + assert_eq!(&buffer[..], format!("\"{}\"", s1).as_bytes()); + + buffer.clear(); + + s2.write_to(&mut buffer); + + assert_eq!(&buffer[..], format!("\"{}\"", s2).as_bytes()); + } + + fn test_encoding_intern_value(value: InternValue<'_>) { + let mut expected: Vec = vec![]; + let mut actual: Vec = vec![]; + + let mut buffer = vec![]; + + value.write_as_str(&mut actual, &mut buffer).unwrap(); + + let InternValue::OpenTelemetryValue(value) = value else { + return; + }; + + rmp::encode::write_str(&mut expected, value.as_str().as_ref()).unwrap(); + + assert_eq!(expected, actual); + } + + #[test] + fn test_encode_boolean() { + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::Bool(true))); + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::Bool(false))); + } + + #[test] + fn test_encode_i64() { + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::I64(123))); + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::I64(0))); + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::I64(-123))); + } + + #[test] + fn test_encode_f64() { + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::F64(123.456f64))); + test_encoding_intern_value(InternValue::OpenTelemetryValue(&Value::F64(-123.456f64))); + } } diff --git a/opentelemetry-datadog/src/exporter/mod.rs b/opentelemetry-datadog/src/exporter/mod.rs index d6516e37..5f98df72 100644 --- a/opentelemetry-datadog/src/exporter/mod.rs +++ b/opentelemetry-datadog/src/exporter/mod.rs @@ -8,7 +8,6 @@ pub use model::FieldMappingFn; use crate::exporter::model::FieldMapping; use futures_core::future::BoxFuture; use http::{Method, Request, Uri}; -use itertools::Itertools; use opentelemetry::{global, trace::TraceError, KeyValue}; use opentelemetry_http::{HttpClient, ResponseExt}; use opentelemetry_sdk::{ @@ -90,8 +89,11 @@ impl DatadogExporter { } } - fn build_request(&self, batch: Vec) -> Result>, TraceError> { - let traces: Vec> = group_into_traces(batch); + fn build_request( + &self, + mut batch: Vec, + ) -> Result>, TraceError> { + let traces: Vec<&[SpanData]> = group_into_traces(&mut batch); let trace_count = traces.len(); let data = self.api_version.encode( &self.model_config, @@ -394,12 +396,27 @@ impl DatadogPipelineBuilder { } } -fn group_into_traces(spans: Vec) -> Vec> { - spans - .into_iter() - .into_group_map_by(|span_data| span_data.span_context.trace_id()) - .into_values() - .collect() +fn group_into_traces(spans: &mut [SpanData]) -> Vec<&[SpanData]> { + if spans.is_empty() { + return vec![]; + } + + spans.sort_by_key(|x| x.span_context.trace_id().to_bytes()); + + let mut traces: Vec<&[SpanData]> = Vec::with_capacity(spans.len()); + + let mut start = 0; + let mut start_trace_id = spans[start].span_context.trace_id(); + for (idx, span) in spans.iter().enumerate() { + let current_trace_id = span.span_context.trace_id(); + if start_trace_id != current_trace_id { + traces.push(&spans[start..idx]); + start = idx; + start_trace_id = current_trace_id; + } + } + traces.push(&spans[start..]); + traces } async fn send_request( @@ -450,13 +467,13 @@ mod tests { #[test] fn test_out_of_order_group() { - let batch = vec![get_span(1, 1, 1), get_span(2, 2, 2), get_span(1, 1, 3)]; + let mut batch = vec![get_span(1, 1, 1), get_span(2, 2, 2), get_span(1, 1, 3)]; let expected = vec![ vec![get_span(1, 1, 1), get_span(1, 1, 3)], vec![get_span(2, 2, 2)], ]; - let mut traces = group_into_traces(batch); + let mut traces = group_into_traces(&mut batch); // We need to sort the output in order to compare, but this is not required by the Datadog agent traces.sort_by_key(|t| u128::from_be_bytes(t[0].span_context.trace_id().to_bytes())); diff --git a/opentelemetry-datadog/src/exporter/model/mod.rs b/opentelemetry-datadog/src/exporter/model/mod.rs index a37018d0..715580b5 100644 --- a/opentelemetry-datadog/src/exporter/model/mod.rs +++ b/opentelemetry-datadog/src/exporter/model/mod.rs @@ -147,7 +147,7 @@ impl ApiVersion { pub(crate) fn encode( self, model_config: &ModelConfig, - traces: Vec>, + traces: Vec<&[trace::SpanData]>, mapping: &Mapping, unified_tags: &UnifiedTags, ) -> Result, Error> { @@ -252,7 +252,7 @@ pub(crate) mod tests { }; let encoded = base64::encode(ApiVersion::Version03.encode( &model_config, - traces, + traces.iter().map(|x| &x[..]).collect(), &Mapping::empty(), &UnifiedTags::new(), )?); @@ -280,7 +280,7 @@ pub(crate) mod tests { let _encoded = base64::encode(ApiVersion::Version05.encode( &model_config, - traces, + traces.iter().map(|x| &x[..]).collect(), &Mapping::empty(), &unified_tags, )?); diff --git a/opentelemetry-datadog/src/exporter/model/v03.rs b/opentelemetry-datadog/src/exporter/model/v03.rs index 8f27dce7..5def2ef8 100644 --- a/opentelemetry-datadog/src/exporter/model/v03.rs +++ b/opentelemetry-datadog/src/exporter/model/v03.rs @@ -6,7 +6,7 @@ use std::time::SystemTime; pub(crate) fn encode( model_config: &ModelConfig, - traces: Vec>, + traces: Vec<&[SpanData]>, get_service_name: S, get_name: N, get_resource: R, @@ -22,7 +22,7 @@ where for trace in traces.into_iter() { rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?; - for span in trace.into_iter() { + for span in trace { // Safe until the year 2262 when Datadog will need to change their API let start = span .start_time @@ -53,13 +53,13 @@ where // Datadog span name is OpenTelemetry component name - see module docs for more information rmp::encode::write_str(&mut encoded, "service")?; - rmp::encode::write_str(&mut encoded, get_service_name(&span, model_config))?; + rmp::encode::write_str(&mut encoded, get_service_name(span, model_config))?; rmp::encode::write_str(&mut encoded, "name")?; - rmp::encode::write_str(&mut encoded, get_name(&span, model_config))?; + rmp::encode::write_str(&mut encoded, get_name(span, model_config))?; rmp::encode::write_str(&mut encoded, "resource")?; - rmp::encode::write_str(&mut encoded, get_resource(&span, model_config))?; + rmp::encode::write_str(&mut encoded, get_resource(span, model_config))?; rmp::encode::write_str(&mut encoded, "trace_id")?; rmp::encode::write_u64( diff --git a/opentelemetry-datadog/src/exporter/model/v05.rs b/opentelemetry-datadog/src/exporter/model/v05.rs index 9a0decaf..68b603cd 100644 --- a/opentelemetry-datadog/src/exporter/model/v05.rs +++ b/opentelemetry-datadog/src/exporter/model/v05.rs @@ -69,7 +69,7 @@ const GIT_META_TAGS_COUNT: u32 = if matches!( // pub(crate) fn encode( model_config: &ModelConfig, - traces: Vec>, + traces: Vec<&[SpanData]>, get_service_name: S, get_name: N, get_resource: R, @@ -87,27 +87,24 @@ where get_service_name, get_name, get_resource, - traces, + &traces, unified_tags, )?; - let mut payload = Vec::new(); + let mut payload = Vec::with_capacity(traces.len() * 512); rmp::encode::write_array_len(&mut payload, 2)?; - rmp::encode::write_array_len(&mut payload, interner.len())?; - for data in interner.iter() { - rmp::encode::write_str(&mut payload, data)?; - } + interner.write_dictionary(&mut payload)?; payload.append(&mut encoded_traces); Ok(payload) } -fn write_unified_tags( +fn write_unified_tags<'a>( encoded: &mut Vec, - interner: &mut StringInterner, - unified_tags: &UnifiedTags, + interner: &mut StringInterner<'a>, + unified_tags: &'a UnifiedTags, ) -> Result<(), Error> { write_unified_tag(encoded, interner, &unified_tags.service)?; write_unified_tag(encoded, interner, &unified_tags.env)?; @@ -115,10 +112,10 @@ fn write_unified_tags( Ok(()) } -fn write_unified_tag( +fn write_unified_tag<'a>( encoded: &mut Vec, - interner: &mut StringInterner, - tag: &UnifiedTagField, + interner: &mut StringInterner<'a>, + tag: &'a UnifiedTagField, ) -> Result<(), Error> { if let Some(tag_value) = &tag.value { rmp::encode::write_u32(encoded, interner.intern(tag.get_tag_name()))?; @@ -149,14 +146,14 @@ fn get_measuring(span: &SpanData) -> f64 { } } -fn encode_traces( - interner: &mut StringInterner, - model_config: &ModelConfig, +fn encode_traces<'interner, S, N, R>( + interner: &mut StringInterner<'interner>, + model_config: &'interner ModelConfig, get_service_name: S, get_name: N, get_resource: R, - traces: Vec>, - unified_tags: &UnifiedTags, + traces: &'interner [&[SpanData]], + unified_tags: &'interner UnifiedTags, ) -> Result, Error> where for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, @@ -166,10 +163,10 @@ where let mut encoded = Vec::new(); rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?; - for trace in traces.into_iter() { + for trace in traces.iter() { rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?; - for span in trace.into_iter() { + for span in trace.iter() { // Safe until the year 2262 when Datadog will need to change their API let start = span .start_time @@ -186,7 +183,7 @@ where let mut span_type = interner.intern(""); for kv in &span.attributes { if kv.key.as_str() == "span.type" { - span_type = interner.intern(kv.value.as_str().as_ref()); + span_type = interner.intern_value(&kv.value); break; } } @@ -195,12 +192,12 @@ where rmp::encode::write_array_len(&mut encoded, SPAN_NUM_ELEMENTS)?; rmp::encode::write_u32( &mut encoded, - interner.intern(get_service_name(&span, model_config)), + interner.intern(get_service_name(span, model_config)), )?; - rmp::encode::write_u32(&mut encoded, interner.intern(get_name(&span, model_config)))?; + rmp::encode::write_u32(&mut encoded, interner.intern(get_name(span, model_config)))?; rmp::encode::write_u32( &mut encoded, - interner.intern(get_resource(&span, model_config)), + interner.intern(get_resource(span, model_config)), )?; rmp::encode::write_u64( &mut encoded, @@ -232,14 +229,14 @@ where )?; for (key, value) in span.resource.iter() { rmp::encode::write_u32(&mut encoded, interner.intern(key.as_str()))?; - rmp::encode::write_u32(&mut encoded, interner.intern(value.as_str().as_ref()))?; + rmp::encode::write_u32(&mut encoded, interner.intern_value(value))?; } write_unified_tags(&mut encoded, interner, unified_tags)?; for kv in span.attributes.iter() { rmp::encode::write_u32(&mut encoded, interner.intern(kv.key.as_str()))?; - rmp::encode::write_u32(&mut encoded, interner.intern(kv.value.as_str().as_ref()))?; + rmp::encode::write_u32(&mut encoded, interner.intern_value(&kv.value))?; } if let (Some(repository_url), Some(commit_sha)) = ( @@ -254,11 +251,11 @@ where rmp::encode::write_map_len(&mut encoded, METRICS_LEN)?; rmp::encode::write_u32(&mut encoded, interner.intern(SAMPLING_PRIORITY_KEY))?; - let sampling_priority = get_sampling_priority(&span); + let sampling_priority = get_sampling_priority(span); rmp::encode::write_f64(&mut encoded, sampling_priority)?; rmp::encode::write_u32(&mut encoded, interner.intern(DD_MEASURED_KEY))?; - let measuring = get_measuring(&span); + let measuring = get_measuring(span); rmp::encode::write_f64(&mut encoded, measuring)?; rmp::encode::write_u32(&mut encoded, span_type)?; }