diff --git a/pkg/export/otel/traces.go b/pkg/export/otel/traces.go index ff824ab05..7587c1a23 100644 --- a/pkg/export/otel/traces.go +++ b/pkg/export/otel/traces.go @@ -169,6 +169,39 @@ func (tr *tracesOTELReceiver) spanDiscarded(span *request.Span) bool { return span.IgnoreTraces() || span.ServiceID.ExportsOTelTraces() || !tr.acceptSpan(span) } +func (tr *tracesOTELReceiver) processSpans(exp exporter.Traces, spans []request.Span, traceAttrs map[attr.Name]struct{}, sampler trace.Sampler) { + for i := range spans { + span := &spans[i] + if span.InternalSignal() { + continue + } + if tr.spanDiscarded(span) { + continue + } + + finalAttrs := traceAttributes(span, traceAttrs) + + sr := sampler.ShouldSample(trace.SamplingParameters{ + ParentContext: tr.ctx, + Name: span.TraceName(), + TraceID: span.TraceID, + Kind: spanKind(span), + Attributes: finalAttrs, + }) + + if sr.Decision == trace.Drop { + continue + } + + envResourceAttrs := ResourceAttrsFromEnv(&span.ServiceID) + traces := GenerateTracesWithAttributes(span, tr.ctxInfo.HostID, finalAttrs, envResourceAttrs) + err := exp.ConsumeTraces(tr.ctx, traces) + if err != nil { + slog.Error("error sending trace to consumer", "error", err) + } + } +} + func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error) { if !tr.cfg.Enabled() { return pipe.IgnoreFinal[[]request.Span](), nil @@ -198,22 +231,10 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err return } + sampler := tr.cfg.Sampler.Implementation() + for spans := range in { - for i := range spans { - span := &spans[i] - if span.InternalSignal() { - continue - } - if tr.spanDiscarded(span) { - continue - } - envResourceAttrs := ResourceAttrsFromEnv(&span.ServiceID) - traces := GenerateTraces(span, tr.ctxInfo.HostID, traceAttrs, envResourceAttrs) - err := exp.ConsumeTraces(tr.ctx, traces) - if err != nil { - slog.Error("error sending trace to consumer", "error", err) - } - } + tr.processSpans(exp, spans, traceAttrs, sampler) } }, nil } @@ -384,8 +405,7 @@ func traceAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue return attrs } -// GenerateTraces creates a ptrace.Traces from a request.Span -func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces { +func GenerateTracesWithAttributes(span *request.Span, hostID string, attrs []attribute.KeyValue, envResourceAttrs []attribute.KeyValue) ptrace.Traces { t := span.Timings() start := spanStartTime(t) hasSubSpans := t.Start.After(start) @@ -424,7 +444,6 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s } // Set span attributes - attrs := traceAttributes(span, userAttrs) m := attrsToMap(attrs) m.CopyTo(s.Attributes()) @@ -435,6 +454,11 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s return traces } +// GenerateTraces creates a ptrace.Traces from a request.Span +func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces { + return GenerateTracesWithAttributes(span, hostID, traceAttributes(span, userAttrs), envResourceAttrs) +} + // createSubSpans creates the internal spans for a request.Span func createSubSpans(span *request.Span, parentSpanID pcommon.SpanID, traceID pcommon.TraceID, ss *ptrace.ScopeSpans, t request.Timings) { // Create a child span showing the queue time diff --git a/pkg/export/otel/traces_test.go b/pkg/export/otel/traces_test.go index 60e3c4fc8..ce0af2584 100644 --- a/pkg/export/otel/traces_test.go +++ b/pkg/export/otel/traces_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "os" + "strconv" "sync" "sync/atomic" "testing" @@ -15,10 +16,13 @@ import ( "github.com/mariomac/pipes/pipe" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.25.0" "go.opentelemetry.io/otel/trace" @@ -600,6 +604,76 @@ func TestGenerateTracesAttributes(t *testing.T) { }) } +func TestTraceSampling(t *testing.T) { + spans := []request.Span{} + start := time.Now() + for i := 0; i < 10; i++ { + span := request.Span{Type: request.EventTypeHTTP, + RequestStart: start.UnixNano(), + Start: start.Add(time.Second).UnixNano(), + End: start.Add(3 * time.Second).UnixNano(), + Method: "GET", + Route: "/test" + strconv.Itoa(i), + Status: 200, + ServiceID: svc.ID{}, + TraceID: randomTraceID(), + } + spans = append(spans, span) + } + + receiver := makeTracesTestReceiver([]string{"http"}) + + t.Run("test sample all", func(t *testing.T) { + sampler := sdktrace.AlwaysSample() + attrs := make(map[attr.Name]struct{}) + + tr := []ptrace.Traces{} + + exporter := TestExporter{ + collector: func(td ptrace.Traces) { + tr = append(tr, td) + }, + } + + receiver.processSpans(exporter, spans, attrs, sampler) + assert.Equal(t, 10, len(tr)) + }) + + t.Run("test sample nothing", func(t *testing.T) { + sampler := sdktrace.NeverSample() + attrs := make(map[attr.Name]struct{}) + + tr := []ptrace.Traces{} + + exporter := TestExporter{ + collector: func(td ptrace.Traces) { + tr = append(tr, td) + }, + } + + receiver.processSpans(exporter, spans, attrs, sampler) + assert.Equal(t, 0, len(tr)) + }) + + t.Run("test sample 1/10th", func(t *testing.T) { + sampler := sdktrace.TraceIDRatioBased(0.1) + attrs := make(map[attr.Name]struct{}) + + tr := []ptrace.Traces{} + + exporter := TestExporter{ + collector: func(td ptrace.Traces) { + tr = append(tr, td) + }, + } + + receiver.processSpans(exporter, spans, attrs, sampler) + // The result is likely 0,1,2 with 1/10th, but we don't want + // to maybe fail if it accidentally it randomly becomes 3 + assert.GreaterOrEqual(t, 3, len(tr)) + }) +} + func TestAttrsToMap(t *testing.T) { t.Run("test with string attribute", func(t *testing.T) { attrs := []attribute.KeyValue{ @@ -1232,3 +1306,24 @@ func generateTracesForSpans(t *testing.T, tr *tracesOTELReceiver, spans []reques return res } + +type TestExporter struct { + collector func(td ptrace.Traces) +} + +func (e TestExporter) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (e TestExporter) Shutdown(_ context.Context) error { + return nil +} + +func (e TestExporter) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + e.collector(td) + return nil +} + +func (e TestExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} diff --git a/pkg/internal/ebpf/ktracer/bpf_arm64_bpfel.o b/pkg/internal/ebpf/ktracer/bpf_arm64_bpfel.o index 4d51385a0..575bcc92c 100644 --- a/pkg/internal/ebpf/ktracer/bpf_arm64_bpfel.o +++ b/pkg/internal/ebpf/ktracer/bpf_arm64_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:e65386bef45dcce6b67ab5aa3f92d04a390291df34ea98f18f9d857cadd35ebe -size 355408 +oid sha256:a6ae04f97d30e3d9c5fce4275bcd243dabb39d91578d10145342592f8c3c8552 +size 355496 diff --git a/pkg/internal/ebpf/ktracer/bpf_debug_arm64_bpfel.o b/pkg/internal/ebpf/ktracer/bpf_debug_arm64_bpfel.o index ecef74e68..42c7a0104 100644 --- a/pkg/internal/ebpf/ktracer/bpf_debug_arm64_bpfel.o +++ b/pkg/internal/ebpf/ktracer/bpf_debug_arm64_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:96a91823649748c856e4b86a288a3772278ea0b1d4ffcb765ca50fe674a47093 -size 532736 +oid sha256:3cfb1226360807de420ab981791c8fffcb4ffac092035cb330933a0db8d33146 +size 532824 diff --git a/pkg/internal/ebpf/ktracer/bpf_debug_x86_bpfel.o b/pkg/internal/ebpf/ktracer/bpf_debug_x86_bpfel.o index 06ac848c5..407a4ed93 100644 --- a/pkg/internal/ebpf/ktracer/bpf_debug_x86_bpfel.o +++ b/pkg/internal/ebpf/ktracer/bpf_debug_x86_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:abf8b982af575d8e3d9ae0583af49e2cf3d12c92cd2032abc2b8bd4b0c729f2d -size 532584 +oid sha256:261ccfe170c57dc77945151610bd1f79d73d2b2f5c13dc651a9b25a5d5233e2e +size 532672 diff --git a/pkg/internal/ebpf/ktracer/bpf_tp_arm64_bpfel.o b/pkg/internal/ebpf/ktracer/bpf_tp_arm64_bpfel.o index 110c822c9..b16a5a5fe 100644 --- a/pkg/internal/ebpf/ktracer/bpf_tp_arm64_bpfel.o +++ b/pkg/internal/ebpf/ktracer/bpf_tp_arm64_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:2394b1644379543bdff4b59dc77ff1d645b4392e3f4b6a3f736f5f150cecf3aa -size 370000 +oid sha256:e2eb5aa04d474c1b07384ff3d0d4ad116c1b444b03ce47f7a6d08898132d9908 +size 370096 diff --git a/pkg/internal/ebpf/ktracer/bpf_tp_debug_arm64_bpfel.o b/pkg/internal/ebpf/ktracer/bpf_tp_debug_arm64_bpfel.o index f3f26cd20..9aa5d21d2 100644 --- a/pkg/internal/ebpf/ktracer/bpf_tp_debug_arm64_bpfel.o +++ b/pkg/internal/ebpf/ktracer/bpf_tp_debug_arm64_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:3b71cb95fa528e821b44b678ed9ad55e096ded53e8d1fc3f02409fdd32ddd83e -size 550936 +oid sha256:9fd1e4312cada04a41c275dd2d77cd2536cc0879b0db4c037bc20ae8bd4c2977 +size 551032 diff --git a/pkg/internal/ebpf/ktracer/bpf_tp_debug_x86_bpfel.o b/pkg/internal/ebpf/ktracer/bpf_tp_debug_x86_bpfel.o index 6f21a7f48..e72d98422 100644 --- a/pkg/internal/ebpf/ktracer/bpf_tp_debug_x86_bpfel.o +++ b/pkg/internal/ebpf/ktracer/bpf_tp_debug_x86_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:56293f4f3563281134bde7d43701a0cc5490da349c6c07f057162379e9d6785c -size 550792 +oid sha256:999dfc4bb189ad2070d0fa60f749a49e42c5fa9c46f7640a7191e743ac872bbf +size 550880 diff --git a/pkg/internal/ebpf/ktracer/bpf_tp_x86_bpfel.o b/pkg/internal/ebpf/ktracer/bpf_tp_x86_bpfel.o index 42adee408..dc7647bfc 100644 --- a/pkg/internal/ebpf/ktracer/bpf_tp_x86_bpfel.o +++ b/pkg/internal/ebpf/ktracer/bpf_tp_x86_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:6bb240ffbc2d5e102a56d1e88c45f4665ed0de89d46b6407dba6151851809a1c -size 369744 +oid sha256:16f05dda5a0ea3fc6bc9d4499c50df936031017f37ec605371bae620e982f951 +size 369832 diff --git a/pkg/internal/ebpf/ktracer/bpf_x86_bpfel.o b/pkg/internal/ebpf/ktracer/bpf_x86_bpfel.o index dd0a9225a..23c597474 100644 --- a/pkg/internal/ebpf/ktracer/bpf_x86_bpfel.o +++ b/pkg/internal/ebpf/ktracer/bpf_x86_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:aadfc85ccf6ffc1ce702af4d3813f40a8d7aa73ad36c2b1db492d52bb4d1f9c8 -size 355152 +oid sha256:ddcc1954a4d5de08b9b67c617b87bbf01be920f1a05d564261da8ec050b6f7c4 +size 355248