From 7c6ed872ad35f532b0c35022fbd1859e65566401 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Tue, 24 Sep 2024 12:26:33 -0400 Subject: [PATCH] [feat] Deduplicate spans based on their hashcode (#6009) ## Which problem is this PR solving? - Resolves #6001 ## Description of the changes - Rename the Zipkin-legacy span "deduper" to a less confusing name - Add a real span deduper that deduplicates based on span hashcodes - Sort tags in spans before we attempt deduplication, so hashcode is deterministic ## How was this change tested? - Unit tested ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Chris Danis Signed-off-by: Chris Danis Co-authored-by: Yuri Shkuro --- cmd/query/app/querysvc/adjusters.go | 5 +- model/adjuster/clockskew.go | 2 +- model/adjuster/sort_log_fields_test.go | 86 ---------- ..._fields.go => sort_tags_and_log_fields.go} | 15 +- .../adjuster/sort_tags_and_log_fields_test.go | 155 ++++++++++++++++++ model/adjuster/span_hash_deduper.go | 40 +++++ model/adjuster/span_hash_deduper_test.go | 128 +++++++++++++++ ..._deduper.go => zipkin_span_id_uniquify.go} | 10 +- ...est.go => zipkin_span_id_uniquify_test.go} | 20 +-- plugin/storage/memory/memory.go | 4 +- 10 files changed, 355 insertions(+), 110 deletions(-) delete mode 100644 model/adjuster/sort_log_fields_test.go rename model/adjuster/{sort_log_fields.go => sort_tags_and_log_fields.go} (74%) create mode 100644 model/adjuster/sort_tags_and_log_fields_test.go create mode 100644 model/adjuster/span_hash_deduper.go create mode 100644 model/adjuster/span_hash_deduper_test.go rename model/adjuster/{span_id_deduper.go => zipkin_span_id_uniquify.go} (92%) rename model/adjuster/{span_id_deduper_test.go => zipkin_span_id_uniquify_test.go} (87%) diff --git a/cmd/query/app/querysvc/adjusters.go b/cmd/query/app/querysvc/adjusters.go index 346d0c0bca4..ef0e82cb0ac 100644 --- a/cmd/query/app/querysvc/adjusters.go +++ b/cmd/query/app/querysvc/adjusters.go @@ -14,11 +14,12 @@ import ( // before returning the data to the API clients. func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster { return []adjuster.Adjuster{ - adjuster.SpanIDDeduper(), + adjuster.ZipkinSpanIDUniquifier(), adjuster.ClockSkew(maxClockSkewAdjust), adjuster.IPTagAdjuster(), adjuster.OTelTagAdjuster(), - adjuster.SortLogFields(), + adjuster.SortTagsAndLogFields(), + adjuster.DedupeBySpanHash(), // requires SortTagsAndLogFields for deterministic results adjuster.SpanReferences(), adjuster.ParentReference(), } diff --git a/model/adjuster/clockskew.go b/model/adjuster/clockskew.go index e7965f5a11b..30f922307dc 100644 --- a/model/adjuster/clockskew.go +++ b/model/adjuster/clockskew.go @@ -19,7 +19,7 @@ import ( // child spans do not start before or end after their parent spans. // // The algorithm assumes that all spans have unique IDs, so the trace may need -// to go through another adjuster first, such as SpanIDDeduper. +// to go through another adjuster first, such as ZipkinSpanIDUniquifier. // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. diff --git a/model/adjuster/sort_log_fields_test.go b/model/adjuster/sort_log_fields_test.go deleted file mode 100644 index dafbcb7249d..00000000000 --- a/model/adjuster/sort_log_fields_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. -// SPDX-License-Identifier: Apache-2.0 - -package adjuster - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/jaegertracing/jaeger/model" -) - -func TestSortLogFields(t *testing.T) { - testCases := []struct { - fields model.KeyValues - expected model.KeyValues - }{ - { - fields: model.KeyValues{ - model.String("event", "some event"), // event already in the first position, and no other fields - }, - expected: model.KeyValues{ - model.String("event", "some event"), - }, - }, - { - fields: model.KeyValues{ - model.Int64("event", 42), // non-string event field - model.Int64("a", 41), - }, - expected: model.KeyValues{ - model.Int64("a", 41), - model.Int64("event", 42), - }, - }, - { - fields: model.KeyValues{ - model.String("nonsense", "42"), // no event field - }, - expected: model.KeyValues{ - model.String("nonsense", "42"), - }, - }, - { - fields: model.KeyValues{ - model.String("event", "some event"), - model.Int64("a", 41), - }, - expected: model.KeyValues{ - model.String("event", "some event"), - model.Int64("a", 41), - }, - }, - { - fields: model.KeyValues{ - model.Int64("x", 1), - model.Int64("a", 2), - model.String("event", "some event"), - }, - expected: model.KeyValues{ - model.String("event", "some event"), - model.Int64("a", 2), - model.Int64("x", 1), - }, - }, - } - for _, testCase := range testCases { - trace := &model.Trace{ - Spans: []*model.Span{ - { - Logs: []model.Log{ - { - Fields: testCase.fields, - }, - }, - }, - }, - } - trace, err := SortLogFields().Adjust(trace) - require.NoError(t, err) - assert.Equal(t, testCase.expected, model.KeyValues(trace.Spans[0].Logs[0].Fields)) - } -} diff --git a/model/adjuster/sort_log_fields.go b/model/adjuster/sort_tags_and_log_fields.go similarity index 74% rename from model/adjuster/sort_log_fields.go rename to model/adjuster/sort_tags_and_log_fields.go index df16d446394..0d6028a2d0c 100644 --- a/model/adjuster/sort_log_fields.go +++ b/model/adjuster/sort_tags_and_log_fields.go @@ -8,18 +8,25 @@ import ( "github.com/jaegertracing/jaeger/model" ) -// SortLogFields returns an Adjuster that sorts the fields in the span logs. -// It puts the `event` field in the first position (if present), and sorts -// all other fields lexicographically. +// SortTagsAndLogFields returns an Adjuster that sorts the fields in the tags and +// span logs. +// +// For span logs, it puts the `event` field in the first position (if present), and +// sorts all other fields lexicographically. // // TODO: it should also do something about the "msg" field, maybe replace it // with "event" field. // TODO: we may also want to move "level" field (as in logging level) to an earlier // place in the list. This adjuster needs some sort of config describing predefined // field names/types and their relative order. -func SortLogFields() Adjuster { +func SortTagsAndLogFields() Adjuster { return Func(func(trace *model.Trace) (*model.Trace, error) { for _, span := range trace.Spans { + model.KeyValues(span.Tags).Sort() + if span.Process != nil { + model.KeyValues(span.Process.Tags).Sort() + } + for _, log := range span.Logs { // first move "event" field into the first position offset := 0 diff --git a/model/adjuster/sort_tags_and_log_fields_test.go b/model/adjuster/sort_tags_and_log_fields_test.go new file mode 100644 index 00000000000..4d31c7248b6 --- /dev/null +++ b/model/adjuster/sort_tags_and_log_fields_test.go @@ -0,0 +1,155 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/model" +) + +func TestSortTagsAndLogFieldsDoesSortFields(t *testing.T) { + testCases := []struct { + fields model.KeyValues + expected model.KeyValues + }{ + { + fields: model.KeyValues{ + model.String("event", "some event"), // event already in the first position, and no other fields + }, + expected: model.KeyValues{ + model.String("event", "some event"), + }, + }, + { + fields: model.KeyValues{ + model.Int64("event", 42), // non-string event field + model.Int64("a", 41), + }, + expected: model.KeyValues{ + model.Int64("a", 41), + model.Int64("event", 42), + }, + }, + { + fields: model.KeyValues{ + model.String("nonsense", "42"), // no event field + }, + expected: model.KeyValues{ + model.String("nonsense", "42"), + }, + }, + { + fields: model.KeyValues{ + model.String("event", "some event"), + model.Int64("a", 41), + }, + expected: model.KeyValues{ + model.String("event", "some event"), + model.Int64("a", 41), + }, + }, + { + fields: model.KeyValues{ + model.Int64("x", 1), + model.Int64("a", 2), + model.String("event", "some event"), + }, + expected: model.KeyValues{ + model.String("event", "some event"), + model.Int64("a", 2), + model.Int64("x", 1), + }, + }, + } + for _, testCase := range testCases { + trace := &model.Trace{ + Spans: []*model.Span{ + { + Logs: []model.Log{ + { + Fields: testCase.fields, + }, + }, + }, + }, + } + trace, err := SortTagsAndLogFields().Adjust(trace) + require.NoError(t, err) + assert.Equal(t, testCase.expected, model.KeyValues(trace.Spans[0].Logs[0].Fields)) + } +} + +func TestSortTagsAndLogFieldsDoesSortTags(t *testing.T) { + testCases := []model.KeyValues{ + { + model.String("http.method", "GET"), + model.String("http.url", "http://wikipedia.org"), + model.Int64("http.status_code", 200), + model.String("guid:x-request-id", "f61defd2-7a77-11ef-b54f-4fbb67a6d181"), + }, + { + // empty + }, + { + model.String("argv", "mkfs.ext4 /dev/sda1"), + }, + } + for _, tags := range testCases { + trace := &model.Trace{ + Spans: []*model.Span{ + { + Tags: tags, + }, + }, + } + sorted, err := SortTagsAndLogFields().Adjust(trace) + require.NoError(t, err) + assert.ElementsMatch(t, tags, sorted.Spans[0].Tags) + adjustedKeys := make([]string, len(sorted.Spans[0].Tags)) + for i, kv := range sorted.Spans[0].Tags { + adjustedKeys[i] = kv.Key + } + assert.IsIncreasing(t, adjustedKeys) + } +} + +func TestSortTagsAndLogFieldsDoesSortProcessTags(t *testing.T) { + trace := &model.Trace{ + Spans: []*model.Span{ + { + Process: &model.Process{ + Tags: model.KeyValues{ + model.String("process.argv", "mkfs.ext4 /dev/sda1"), + model.Int64("process.pid", 42), + model.Int64("process.uid", 0), + model.String("k8s.node.roles", "control-plane,etcd,master"), + }, + }, + }, + }, + } + sorted, err := SortTagsAndLogFields().Adjust(trace) + require.NoError(t, err) + assert.ElementsMatch(t, trace.Spans[0].Process.Tags, sorted.Spans[0].Process.Tags) + adjustedKeys := make([]string, len(sorted.Spans[0].Process.Tags)) + for i, kv := range sorted.Spans[0].Process.Tags { + adjustedKeys[i] = kv.Key + } + assert.IsIncreasing(t, adjustedKeys) +} + +func TestSortTagsAndLogFieldsHandlesNilProcessTags(t *testing.T) { + trace := &model.Trace{ + Spans: []*model.Span{ + {}, + }, + } + _, err := SortTagsAndLogFields().Adjust(trace) + require.NoError(t, err) +} diff --git a/model/adjuster/span_hash_deduper.go b/model/adjuster/span_hash_deduper.go new file mode 100644 index 00000000000..5feb79e1769 --- /dev/null +++ b/model/adjuster/span_hash_deduper.go @@ -0,0 +1,40 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "github.com/jaegertracing/jaeger/model" +) + +// DedupeBySpanHash returns an adjuster that removes all but one span with the same hashcode. +// This is useful for when spans are duplicated in archival storage, as happens with +// ElasticSearch archival. +func DedupeBySpanHash() Adjuster { + return Func(func(trace *model.Trace) (*model.Trace, error) { + deduper := &spanHashDeduper{trace: trace} + deduper.groupSpansByHash() + return deduper.trace, nil + }) +} + +type spanHashDeduper struct { + trace *model.Trace + spansByHash map[uint64]*model.Span +} + +func (d *spanHashDeduper) groupSpansByHash() { + spansByHash := make(map[uint64]*model.Span) + for _, span := range d.trace.Spans { + hash, _ := model.HashCode(span) + if _, ok := spansByHash[hash]; !ok { + spansByHash[hash] = span + } + } + d.spansByHash = spansByHash + + d.trace.Spans = nil + for _, span := range d.spansByHash { + d.trace.Spans = append(d.trace.Spans, span) + } +} diff --git a/model/adjuster/span_hash_deduper_test.go b/model/adjuster/span_hash_deduper_test.go new file mode 100644 index 00000000000..56ca280eeb7 --- /dev/null +++ b/model/adjuster/span_hash_deduper_test.go @@ -0,0 +1,128 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "github.com/jaegertracing/jaeger/model" +) + +func newDuplicatedSpansTrace() *model.Trace { + traceID := model.NewTraceID(0, 42) + return &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: clientSpanID, + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + TraceID: traceID, + SpanID: clientSpanID, // shared span ID + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + // some other span, child of server span + TraceID: traceID, + SpanID: anotherSpanID, + References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)}, + }, + }, + } +} + +func newUniqueSpansTrace() *model.Trace { + traceID := model.NewTraceID(0, 42) + return &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: anotherSpanID, + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + TraceID: traceID, + SpanID: anotherSpanID, // same ID as before, but different metadata + References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)}, + }, + }, + } +} + +func getSpanIDs(spans []*model.Span) []int { + ids := make([]int, len(spans)) + for i, span := range spans { + ids[i] = int(span.SpanID) + } + return ids +} + +func TestDedupeBySpanHashTriggers(t *testing.T) { + trace := newDuplicatedSpansTrace() + deduper := DedupeBySpanHash() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, 2, "should dedupe spans") + + ids := getSpanIDs(trace.Spans) + assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") +} + +func TestDedupeBySpanHashNotTriggered(t *testing.T) { + trace := newUniqueSpansTrace() + deduper := DedupeBySpanHash() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, 2, "should not dedupe spans") + + ids := getSpanIDs(trace.Spans) + assert.ElementsMatch(t, []int{int(anotherSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") + assert.NotEqual(t, trace.Spans[0], trace.Spans[1], "should keep unique hashcodes") +} + +func TestDedupeBySpanHashEmpty(t *testing.T) { + trace := &model.Trace{} + deduper := DedupeBySpanHash() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Empty(t, trace.Spans, "should be empty") +} + +func TestDedupeBySpanHashManyManySpans(t *testing.T) { + traceID := model.NewTraceID(0, 42) + spans := make([]*model.Span, 0, 100) + const distinctSpanIDs = 10 + for i := 0; i < 100; i++ { + spans = append(spans, &model.Span{ + TraceID: traceID, + SpanID: model.SpanID(i % distinctSpanIDs), + }) + } + trace := &model.Trace{Spans: spans} + deduper := DedupeBySpanHash() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, distinctSpanIDs, "should dedupe spans") + + ids := getSpanIDs(trace.Spans) + assert.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, ids, "should keep unique span IDs") +} diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/zipkin_span_id_uniquify.go similarity index 92% rename from model/adjuster/span_id_deduper.go rename to model/adjuster/zipkin_span_id_uniquify.go index 9b669a42932..5f8a86a32cc 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/zipkin_span_id_uniquify.go @@ -11,7 +11,7 @@ import ( "github.com/jaegertracing/jaeger/model" ) -// SpanIDDeduper returns an adjuster that changes span ids for server +// ZipkinSpanIDUniquifier returns an adjuster that changes span ids for server // spans (i.e. spans with tag: span.kind == server) if there is another // client span that shares the same span ID. This is needed to deal with // Zipkin-style clients that reuse the same span ID for both client and server @@ -19,11 +19,11 @@ import ( // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. -func SpanIDDeduper() Adjuster { +func ZipkinSpanIDUniquifier() Adjuster { return Func(func(trace *model.Trace) (*model.Trace, error) { deduper := &spanIDDeduper{trace: trace} deduper.groupSpansByID() - deduper.dedupeSpanIDs() + deduper.uniquifyServerSpanIDs() return deduper.trace, nil }) } @@ -63,7 +63,7 @@ func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool { return false } -func (d *spanIDDeduper) dedupeSpanIDs() { +func (d *spanIDDeduper) uniquifyServerSpanIDs() { oldToNewSpanIDs := make(map[model.SpanID]model.SpanID) for _, span := range d.trace.Spans { // only replace span IDs for server-side spans that share the ID with something else @@ -82,7 +82,7 @@ func (d *spanIDDeduper) dedupeSpanIDs() { } // swapParentIDs corrects ParentSpanID of all spans that are children of the server -// spans whose IDs we deduped. +// spans whose IDs we made unique. func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) { for _, span := range d.trace.Spans { if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok { diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/zipkin_span_id_uniquify_test.go similarity index 87% rename from model/adjuster/span_id_deduper_test.go rename to model/adjuster/zipkin_span_id_uniquify_test.go index 503d9e388db..1ad0895985f 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/zipkin_span_id_uniquify_test.go @@ -20,7 +20,7 @@ var ( keySpanKind = "span.kind" ) -func newTrace() *model.Trace { +func newZipkinTrace() *model.Trace { traceID := model.NewTraceID(0, 42) return &model.Trace{ Spans: []*model.Span{ @@ -52,9 +52,9 @@ func newTrace() *model.Trace { } } -func TestSpanIDDeduperTriggered(t *testing.T) { - trace := newTrace() - deduper := SpanIDDeduper() +func TestZipkinSpanIDUniquifierTriggered(t *testing.T) { + trace := newZipkinTrace() + deduper := ZipkinSpanIDUniquifier() trace, err := deduper.Adjust(trace) require.NoError(t, err) @@ -70,11 +70,11 @@ func TestSpanIDDeduperTriggered(t *testing.T) { assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") } -func TestSpanIDDeduperNotTriggered(t *testing.T) { - trace := newTrace() +func TestZipkinSpanIDUniquifierNotTriggered(t *testing.T) { + trace := newZipkinTrace() trace.Spans = trace.Spans[1:] // remove client span - deduper := SpanIDDeduper() + deduper := ZipkinSpanIDUniquifier() trace, err := deduper.Adjust(trace) require.NoError(t, err) @@ -87,8 +87,8 @@ func TestSpanIDDeduperNotTriggered(t *testing.T) { assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") } -func TestSpanIDDeduperError(t *testing.T) { - trace := newTrace() +func TestZipkinSpanIDUniquifierError(t *testing.T) { + trace := newZipkinTrace() maxID := int64(-1) assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1") @@ -96,7 +96,7 @@ func TestSpanIDDeduperError(t *testing.T) { deduper := &spanIDDeduper{trace: trace} deduper.groupSpansByID() deduper.maxUsedID = maxSpanID - 1 - deduper.dedupeSpanIDs() + deduper.uniquifyServerSpanIDs() if assert.Len(t, trace.Spans[1].Warnings, 1) { assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0]) } diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index d590f1ad1ce..48276aabffb 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -59,7 +59,7 @@ func newTenant(cfg Configuration) *Tenant { traces: map[model.TraceID]*model.Trace{}, services: map[string]struct{}{}, operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.SpanIDDeduper(), + deduper: adjuster.ZipkinSpanIDUniquifier(), config: cfg, } } @@ -90,7 +90,7 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback deps := map[string]*model.DependencyLink{} startTs := endTs.Add(-1 * lookback) for _, orig := range m.traces { - // SpanIDDeduper never returns an err + // ZipkinSpanIDUniquifier never returns an err trace, _ := m.deduper.Adjust(orig) if traceIsBetweenStartAndEnd(startTs, endTs, trace) { for _, s := range trace.Spans {