diff --git a/opentelemetry-datadog/CHANGELOG.md b/opentelemetry-datadog/CHANGELOG.md index 03155991..72ebfd3a 100644 --- a/opentelemetry-datadog/CHANGELOG.md +++ b/opentelemetry-datadog/CHANGELOG.md @@ -12,6 +12,12 @@ - Bump opentelemetry version to 0.22, opentelemetry_sdk version to 0.22 +### Changed + +- allow send all traces to `datadog-agent` with `agent-sampling` feature. +- allow `datadog-agent` generate metrics from spans for [APM](https://docs.datadoghq.com/tracing/metrics/). + + ## v0.9.0 ### Changed diff --git a/opentelemetry-datadog/Cargo.toml b/opentelemetry-datadog/Cargo.toml index 10c37029..4e1d9837 100644 --- a/opentelemetry-datadog/Cargo.toml +++ b/opentelemetry-datadog/Cargo.toml @@ -19,6 +19,7 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [features] +agent-sampling = [] reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"] reqwest-client = ["reqwest", "opentelemetry-http/reqwest"] @@ -49,3 +50,8 @@ opentelemetry_sdk = { workspace = true, features = ["trace", "testing"] } [[example]] name = "datadog" path = "examples/datadog.rs" + +[[example]] +name = "agent_sampling" +path = "examples/agent_sampling.rs" +required-features = ["agent-sampling"] diff --git a/opentelemetry-datadog/README.md b/opentelemetry-datadog/README.md index 4a80077a..f56c0b14 100644 --- a/opentelemetry-datadog/README.md +++ b/opentelemetry-datadog/README.md @@ -24,6 +24,7 @@ to [`Datadog`]. `opentelemetry-datadog` supports following features: +- `agent-sampling`: move decision making about sampling to `datadog-agent` (see `agent_sampling.rs` example). - `reqwest-blocking-client`: use `reqwest` blocking http client to send spans. - `reqwest-client`: use `reqwest` http client to send spans. - `surf-client`: use `surf` http client to send spans. diff --git a/opentelemetry-datadog/examples/agent_sampling.rs b/opentelemetry-datadog/examples/agent_sampling.rs new file mode 100644 index 00000000..4b0b6ea0 --- /dev/null +++ b/opentelemetry-datadog/examples/agent_sampling.rs @@ -0,0 +1,78 @@ +use opentelemetry::{ + global::{self, shutdown_tracer_provider}, + trace::{SamplingResult, Span, TraceContextExt, Tracer}, + Key, +}; +use opentelemetry_datadog::{new_pipeline, ApiVersion, DatadogTraceStateBuilder}; +use opentelemetry_sdk::trace::{self, RandomIdGenerator, ShouldSample}; +use std::thread; +use std::time::Duration; + +fn bar() { + let tracer = global::tracer("component-bar"); + let mut span = tracer.start("bar"); + span.set_attribute(Key::new("span.type").string("sql")); + span.set_attribute(Key::new("sql.query").string("SELECT * FROM table")); + thread::sleep(Duration::from_millis(6)); + span.end() +} + +#[derive(Debug, Clone)] +struct AgentBasedSampler; + +impl ShouldSample for AgentBasedSampler { + fn should_sample( + &self, + parent_context: Option<&opentelemetry::Context>, + _trace_id: opentelemetry::trace::TraceId, + _name: &str, + _span_kind: &opentelemetry::trace::SpanKind, + _attributes: &[opentelemetry::KeyValue], + _links: &[opentelemetry::trace::Link], + ) -> opentelemetry::trace::SamplingResult { + let trace_state = parent_context + .map( + |parent_context| parent_context.span().span_context().trace_state().clone(), // inherit sample decision from parent span + ) + .unwrap_or_else(|| { + DatadogTraceStateBuilder::default() + .with_priority_sampling(true) // always sample root span(span without remote or local parent) + .with_measuring(true) // datadog-agent will create metric for this span for APM + .build() + }); + + SamplingResult { + decision: opentelemetry::trace::SamplingDecision::RecordAndSample, // send all spans to datadog-agent + attributes: vec![], + trace_state, + } + } +} + +fn main() -> Result<(), Box> { + let tracer = new_pipeline() + .with_service_name("agent-sampling-demo") + .with_api_version(ApiVersion::Version05) + .with_trace_config( + trace::config() + .with_sampler(AgentBasedSampler) + .with_id_generator(RandomIdGenerator::default()), + ) + .install_simple()?; + + tracer.in_span("foo", |cx| { + let span = cx.span(); + span.set_attribute(Key::new("span.type").string("web")); + span.set_attribute(Key::new("http.url").string("http://localhost:8080/foo")); + span.set_attribute(Key::new("http.method").string("GET")); + span.set_attribute(Key::new("http.status_code").i64(200)); + + thread::sleep(Duration::from_millis(6)); + bar(); + thread::sleep(Duration::from_millis(6)); + }); + + shutdown_tracer_provider(); + + Ok(()) +} diff --git a/opentelemetry-datadog/src/exporter/model/mod.rs b/opentelemetry-datadog/src/exporter/model/mod.rs index f0b626a3..93b4d9ee 100644 --- a/opentelemetry-datadog/src/exporter/model/mod.rs +++ b/opentelemetry-datadog/src/exporter/model/mod.rs @@ -20,6 +20,9 @@ mod v05; // https://github.com/DataDog/dd-trace-js/blob/c89a35f7d27beb4a60165409376e170eacb194c5/packages/dd-trace/src/constants.js#L4 static SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1"; +// https://github.com/DataDog/datadog-agent/blob/ec96f3c24173ec66ba235bda7710504400d9a000/pkg/trace/traceutil/span.go#L20 +static DD_MEASURED_KEY: &str = "_dd.measured"; + /// Custom mapping between opentelemetry spans and datadog spans. /// /// User can provide custom function to change the mapping. It currently supports customizing the following diff --git a/opentelemetry-datadog/src/exporter/model/v05.rs b/opentelemetry-datadog/src/exporter/model/v05.rs index 7f0738a6..9a0decaf 100644 --- a/opentelemetry-datadog/src/exporter/model/v05.rs +++ b/opentelemetry-datadog/src/exporter/model/v05.rs @@ -1,6 +1,7 @@ use crate::exporter::intern::StringInterner; -use crate::exporter::model::SAMPLING_PRIORITY_KEY; +use crate::exporter::model::{DD_MEASURED_KEY, SAMPLING_PRIORITY_KEY}; use crate::exporter::{Error, ModelConfig}; +use crate::propagator::DatadogTraceState; use opentelemetry::trace::Status; use opentelemetry_sdk::export::trace::SpanData; use std::time::SystemTime; @@ -8,6 +9,7 @@ use std::time::SystemTime; use super::unified_tags::{UnifiedTagField, UnifiedTags}; const SPAN_NUM_ELEMENTS: u32 = 12; +const METRICS_LEN: u32 = 2; const GIT_META_TAGS_COUNT: u32 = if matches!( ( option_env!("DD_GIT_REPOSITORY_URL"), @@ -125,6 +127,28 @@ fn write_unified_tag( Ok(()) } +#[cfg(not(feature = "agent-sampling"))] +fn get_sampling_priority(_span: &SpanData) -> f64 { + 1.0 +} + +#[cfg(feature = "agent-sampling")] +fn get_sampling_priority(span: &SpanData) -> f64 { + if span.span_context.trace_state().priority_sampling_enabled() { + 1.0 + } else { + 0.0 + } +} + +fn get_measuring(span: &SpanData) -> f64 { + if span.span_context.trace_state().measuring_enabled() { + 1.0 + } else { + 0.0 + } +} + fn encode_traces( interner: &mut StringInterner, model_config: &ModelConfig, @@ -228,16 +252,14 @@ where rmp::encode::write_u32(&mut encoded, interner.intern(commit_sha))?; } - rmp::encode::write_map_len(&mut encoded, 1)?; + rmp::encode::write_map_len(&mut encoded, METRICS_LEN)?; rmp::encode::write_u32(&mut encoded, interner.intern(SAMPLING_PRIORITY_KEY))?; - rmp::encode::write_f64( - &mut encoded, - if span.span_context.is_sampled() { - 1.0 - } else { - 0.0 - }, - )?; + let sampling_priority = get_sampling_priority(&span); + rmp::encode::write_f64(&mut encoded, sampling_priority)?; + + rmp::encode::write_u32(&mut encoded, interner.intern(DD_MEASURED_KEY))?; + let measuring = get_measuring(&span); + rmp::encode::write_f64(&mut encoded, measuring)?; rmp::encode::write_u32(&mut encoded, span_type)?; } } diff --git a/opentelemetry-datadog/src/lib.rs b/opentelemetry-datadog/src/lib.rs index 273b9bc8..371475c2 100644 --- a/opentelemetry-datadog/src/lib.rs +++ b/opentelemetry-datadog/src/lib.rs @@ -140,7 +140,7 @@ pub use exporter::{ new_pipeline, ApiVersion, DatadogExporter, DatadogPipelineBuilder, Error, FieldMappingFn, ModelConfig, }; -pub use propagator::DatadogPropagator; +pub use propagator::{DatadogPropagator, DatadogTraceState, DatadogTraceStateBuilder}; mod propagator { use once_cell::sync::Lazy; @@ -155,6 +155,11 @@ mod propagator { const DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority"; const TRACE_FLAG_DEFERRED: TraceFlags = TraceFlags::new(0x02); + #[cfg(feature = "agent-sampling")] + const TRACE_STATE_PRIORITY_SAMPLING: &str = "psr"; + const TRACE_STATE_MEASURE: &str = "m"; + const TRACE_STATE_TRUE_VALUE: &str = "1"; + const TRACE_STATE_FALSE_VALUE: &str = "0"; static DATADOG_HEADER_FIELDS: Lazy<[String; 3]> = Lazy::new(|| { [ @@ -164,6 +169,104 @@ mod propagator { ] }); + #[derive(Default)] + pub struct DatadogTraceStateBuilder { + #[cfg(feature = "agent-sampling")] + priority_sampling: bool, + measuring: bool, + } + + fn boolean_to_trace_state_flag(value: bool) -> &'static str { + if value { + TRACE_STATE_TRUE_VALUE + } else { + TRACE_STATE_FALSE_VALUE + } + } + + fn trace_flag_to_boolean(value: &str) -> bool { + value == TRACE_STATE_TRUE_VALUE + } + + impl DatadogTraceStateBuilder { + #[cfg(feature = "agent-sampling")] + pub fn with_priority_sampling(self, enabled: bool) -> Self { + Self { + priority_sampling: enabled, + ..self + } + } + + pub fn with_measuring(self, enabled: bool) -> Self { + Self { + measuring: enabled, + ..self + } + } + + pub fn build(self) -> TraceState { + #[cfg(not(feature = "agent-sampling"))] + let values = [( + TRACE_STATE_MEASURE, + boolean_to_trace_state_flag(self.measuring), + )]; + #[cfg(feature = "agent-sampling")] + let values = [ + ( + TRACE_STATE_MEASURE, + boolean_to_trace_state_flag(self.measuring), + ), + ( + TRACE_STATE_PRIORITY_SAMPLING, + boolean_to_trace_state_flag(self.priority_sampling), + ), + ]; + + TraceState::from_key_value(values).unwrap_or_default() + } + } + + pub trait DatadogTraceState { + fn with_measuring(&self, enabled: bool) -> TraceState; + + fn measuring_enabled(&self) -> bool; + + #[cfg(feature = "agent-sampling")] + fn with_priority_sampling(&self, enabled: bool) -> TraceState; + + #[cfg(feature = "agent-sampling")] + fn priority_sampling_enabled(&self) -> bool; + } + + impl DatadogTraceState for TraceState { + fn with_measuring(&self, enabled: bool) -> TraceState { + self.insert(TRACE_STATE_MEASURE, boolean_to_trace_state_flag(enabled)) + .unwrap_or_else(|_err| self.clone()) + } + + fn measuring_enabled(&self) -> bool { + self.get(TRACE_STATE_MEASURE) + .map(trace_flag_to_boolean) + .unwrap_or_default() + } + + #[cfg(feature = "agent-sampling")] + fn with_priority_sampling(&self, enabled: bool) -> TraceState { + self.insert( + TRACE_STATE_PRIORITY_SAMPLING, + boolean_to_trace_state_flag(enabled), + ) + .unwrap_or_else(|_err| self.clone()) + } + + #[cfg(feature = "agent-sampling")] + fn priority_sampling_enabled(&self) -> bool { + self.get(TRACE_STATE_PRIORITY_SAMPLING) + .map(trace_flag_to_boolean) + .unwrap_or_default() + } + } + enum SamplingPriority { UserReject = -1, AutoReject = 0, @@ -198,6 +301,25 @@ mod propagator { _private: (), } + #[cfg(not(feature = "agent-sampling"))] + fn create_trace_state_and_flags(trace_flags: TraceFlags) -> (TraceState, TraceFlags) { + (TraceState::default(), trace_flags) + } + + #[cfg(feature = "agent-sampling")] + fn create_trace_state_and_flags(trace_flags: TraceFlags) -> (TraceState, TraceFlags) { + if trace_flags & TRACE_FLAG_DEFERRED == TRACE_FLAG_DEFERRED { + (TraceState::default(), trace_flags) + } else { + ( + DatadogTraceStateBuilder::default() + .with_priority_sampling(trace_flags.is_sampled()) + .build(), + TraceFlags::SAMPLED, + ) + } + } + impl DatadogPropagator { /// Creates a new `DatadogPropagator`. pub fn new() -> Self { @@ -262,18 +384,36 @@ mod propagator { Err(_) => TRACE_FLAG_DEFERRED, }; - let trace_state = TraceState::default(); + let (trace_state, trace_flags) = create_trace_state_and_flags(sampled); Ok(SpanContext::new( trace_id, span_id, - sampled, + trace_flags, true, trace_state, )) } } + #[cfg(not(feature = "agent-sampling"))] + fn get_sampling_priority(span_context: &SpanContext) -> SamplingPriority { + if span_context.is_sampled() { + SamplingPriority::AutoKeep + } else { + SamplingPriority::AutoReject + } + } + + #[cfg(feature = "agent-sampling")] + fn get_sampling_priority(span_context: &SpanContext) -> SamplingPriority { + if span_context.trace_state().priority_sampling_enabled() { + SamplingPriority::AutoKeep + } else { + SamplingPriority::AutoReject + } + } + impl TextMapPropagator for DatadogPropagator { fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) { let span = cx.span(); @@ -289,11 +429,7 @@ mod propagator { ); if span_context.trace_flags() & TRACE_FLAG_DEFERRED != TRACE_FLAG_DEFERRED { - let sampling_priority = if span_context.is_sampled() { - SamplingPriority::AutoKeep - } else { - SamplingPriority::AutoReject - }; + let sampling_priority = get_sampling_priority(span_context); injector.set( DATADOG_SAMPLING_PRIORITY_HEADER, @@ -323,7 +459,18 @@ mod propagator { #[rustfmt::skip] fn extract_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> { - vec![ + #[cfg(feature = "agent-sampling")] + return vec![ + (vec![], SpanContext::empty_context()), + (vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()), + (vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())), + ]; + #[cfg(not(feature = "agent-sampling"))] + return vec![ (vec![], SpanContext::empty_context()), (vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()), (vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()), @@ -331,12 +478,23 @@ mod propagator { (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())), (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, TraceState::default())), (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, TraceState::default())), - ] + ]; } #[rustfmt::skip] fn inject_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> { - vec![ + #[cfg(feature = "agent-sampling")] + return vec![ + (vec![], SpanContext::empty_context()), + (vec![], SpanContext::new(TraceId::INVALID, SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TraceFlags::SAMPLED, true, TraceState::default())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())), + ]; + #[cfg(not(feature = "agent-sampling"))] + return vec![ (vec![], SpanContext::empty_context()), (vec![], SpanContext::new(TraceId::INVALID, SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), @@ -344,7 +502,7 @@ mod propagator { (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())), (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, TraceState::default())), (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, TraceState::default())), - ] + ]; } #[test]