Skip to content

Commit

Permalink
Fix traces sampler (#1240)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Oct 9, 2024
1 parent 84767c1 commit 8c3c9db
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 34 deletions.
60 changes: 42 additions & 18 deletions pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,39 @@ func (tr *tracesOTELReceiver) spanDiscarded(span *request.Span) bool {
return span.IgnoreTraces() || span.ServiceID.ExportsOTelTraces() || !tr.acceptSpan(span)
}

func (tr *tracesOTELReceiver) processSpans(exp exporter.Traces, spans []request.Span, traceAttrs map[attr.Name]struct{}, sampler trace.Sampler) {
for i := range spans {
span := &spans[i]
if span.InternalSignal() {
continue
}
if tr.spanDiscarded(span) {
continue
}

finalAttrs := traceAttributes(span, traceAttrs)

sr := sampler.ShouldSample(trace.SamplingParameters{
ParentContext: tr.ctx,
Name: span.TraceName(),
TraceID: span.TraceID,
Kind: spanKind(span),
Attributes: finalAttrs,
})

if sr.Decision == trace.Drop {
continue
}

envResourceAttrs := ResourceAttrsFromEnv(&span.ServiceID)
traces := GenerateTracesWithAttributes(span, tr.ctxInfo.HostID, finalAttrs, envResourceAttrs)
err := exp.ConsumeTraces(tr.ctx, traces)
if err != nil {
slog.Error("error sending trace to consumer", "error", err)
}
}
}

func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error) {
if !tr.cfg.Enabled() {
return pipe.IgnoreFinal[[]request.Span](), nil
Expand Down Expand Up @@ -198,22 +231,10 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
return
}

sampler := tr.cfg.Sampler.Implementation()

for spans := range in {
for i := range spans {
span := &spans[i]
if span.InternalSignal() {
continue
}
if tr.spanDiscarded(span) {
continue
}
envResourceAttrs := ResourceAttrsFromEnv(&span.ServiceID)
traces := GenerateTraces(span, tr.ctxInfo.HostID, traceAttrs, envResourceAttrs)
err := exp.ConsumeTraces(tr.ctx, traces)
if err != nil {
slog.Error("error sending trace to consumer", "error", err)
}
}
tr.processSpans(exp, spans, traceAttrs, sampler)
}
}, nil
}
Expand Down Expand Up @@ -384,8 +405,7 @@ func traceAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue
return attrs
}

// GenerateTraces creates a ptrace.Traces from a request.Span
func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces {
func GenerateTracesWithAttributes(span *request.Span, hostID string, attrs []attribute.KeyValue, envResourceAttrs []attribute.KeyValue) ptrace.Traces {
t := span.Timings()
start := spanStartTime(t)
hasSubSpans := t.Start.After(start)
Expand Down Expand Up @@ -424,7 +444,6 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s
}

// Set span attributes
attrs := traceAttributes(span, userAttrs)
m := attrsToMap(attrs)
m.CopyTo(s.Attributes())

Expand All @@ -435,6 +454,11 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s
return traces
}

// GenerateTraces creates a ptrace.Traces from a request.Span
func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces {
return GenerateTracesWithAttributes(span, hostID, traceAttributes(span, userAttrs), envResourceAttrs)
}

// createSubSpans creates the internal spans for a request.Span
func createSubSpans(span *request.Span, parentSpanID pcommon.SpanID, traceID pcommon.TraceID, ss *ptrace.ScopeSpans, t request.Timings) {
// Create a child span showing the queue time
Expand Down
95 changes: 95 additions & 0 deletions pkg/export/otel/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand All @@ -15,10 +16,13 @@ import (
"github.com/mariomac/pipes/pipe"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -600,6 +604,76 @@ func TestGenerateTracesAttributes(t *testing.T) {
})
}

func TestTraceSampling(t *testing.T) {
spans := []request.Span{}
start := time.Now()
for i := 0; i < 10; i++ {
span := request.Span{Type: request.EventTypeHTTP,
RequestStart: start.UnixNano(),
Start: start.Add(time.Second).UnixNano(),
End: start.Add(3 * time.Second).UnixNano(),
Method: "GET",
Route: "/test" + strconv.Itoa(i),
Status: 200,
ServiceID: svc.ID{},
TraceID: randomTraceID(),
}
spans = append(spans, span)
}

receiver := makeTracesTestReceiver([]string{"http"})

t.Run("test sample all", func(t *testing.T) {
sampler := sdktrace.AlwaysSample()
attrs := make(map[attr.Name]struct{})

tr := []ptrace.Traces{}

exporter := TestExporter{
collector: func(td ptrace.Traces) {
tr = append(tr, td)
},
}

receiver.processSpans(exporter, spans, attrs, sampler)
assert.Equal(t, 10, len(tr))
})

t.Run("test sample nothing", func(t *testing.T) {
sampler := sdktrace.NeverSample()
attrs := make(map[attr.Name]struct{})

tr := []ptrace.Traces{}

exporter := TestExporter{
collector: func(td ptrace.Traces) {
tr = append(tr, td)
},
}

receiver.processSpans(exporter, spans, attrs, sampler)
assert.Equal(t, 0, len(tr))
})

t.Run("test sample 1/10th", func(t *testing.T) {
sampler := sdktrace.TraceIDRatioBased(0.1)
attrs := make(map[attr.Name]struct{})

tr := []ptrace.Traces{}

exporter := TestExporter{
collector: func(td ptrace.Traces) {
tr = append(tr, td)
},
}

receiver.processSpans(exporter, spans, attrs, sampler)
// The result is likely 0,1,2 with 1/10th, but we don't want
// to maybe fail if it accidentally it randomly becomes 3
assert.GreaterOrEqual(t, 3, len(tr))
})
}

func TestAttrsToMap(t *testing.T) {
t.Run("test with string attribute", func(t *testing.T) {
attrs := []attribute.KeyValue{
Expand Down Expand Up @@ -1232,3 +1306,24 @@ func generateTracesForSpans(t *testing.T, tr *tracesOTELReceiver, spans []reques

return res
}

type TestExporter struct {
collector func(td ptrace.Traces)
}

func (e TestExporter) Start(_ context.Context, _ component.Host) error {
return nil
}

func (e TestExporter) Shutdown(_ context.Context) error {
return nil
}

func (e TestExporter) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
e.collector(td)
return nil
}

func (e TestExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/ktracer/bpf_arm64_bpfel.o
Git LFS file not shown
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/ktracer/bpf_debug_arm64_bpfel.o
Git LFS file not shown
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/ktracer/bpf_debug_x86_bpfel.o
Git LFS file not shown
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/ktracer/bpf_tp_arm64_bpfel.o
Git LFS file not shown
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/ktracer/bpf_tp_debug_arm64_bpfel.o
Git LFS file not shown
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/ktracer/bpf_tp_debug_x86_bpfel.o
Git LFS file not shown
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/ktracer/bpf_tp_x86_bpfel.o
Git LFS file not shown
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/ktracer/bpf_x86_bpfel.o
Git LFS file not shown

0 comments on commit 8c3c9db

Please sign in to comment.