Skip to content

Commit

Permalink
[feat] Deduplicate spans based on their hashcode (#6009)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Resolves #6001 

## Description of the changes
- Rename the Zipkin-legacy span "deduper" to a less confusing name
- Add a real span deduper that deduplicates based on span hashcodes
- Sort tags in spans before we attempt deduplication, so hashcode is
deterministic

## How was this change tested?
- Unit tested

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Chris Danis <cdanis@wikimedia.org>
Signed-off-by: Chris Danis <cdanis@gmail.com>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
  • Loading branch information
cdanis and yurishkuro committed Sep 24, 2024
1 parent d243452 commit 7c6ed87
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 110 deletions.
5 changes: 3 additions & 2 deletions cmd/query/app/querysvc/adjusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
// before returning the data to the API clients.
func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster {
return []adjuster.Adjuster{
adjuster.SpanIDDeduper(),
adjuster.ZipkinSpanIDUniquifier(),
adjuster.ClockSkew(maxClockSkewAdjust),
adjuster.IPTagAdjuster(),
adjuster.OTelTagAdjuster(),
adjuster.SortLogFields(),
adjuster.SortTagsAndLogFields(),
adjuster.DedupeBySpanHash(), // requires SortTagsAndLogFields for deterministic results
adjuster.SpanReferences(),
adjuster.ParentReference(),
}
Expand Down
2 changes: 1 addition & 1 deletion model/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// child spans do not start before or end after their parent spans.
//
// The algorithm assumes that all spans have unique IDs, so the trace may need
// to go through another adjuster first, such as SpanIDDeduper.
// to go through another adjuster first, such as ZipkinSpanIDUniquifier.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
Expand Down
86 changes: 0 additions & 86 deletions model/adjuster/sort_log_fields_test.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@ import (
"github.com/jaegertracing/jaeger/model"
)

// SortLogFields returns an Adjuster that sorts the fields in the span logs.
// It puts the `event` field in the first position (if present), and sorts
// all other fields lexicographically.
// SortTagsAndLogFields returns an Adjuster that sorts the fields in the tags and
// span logs.
//
// For span logs, it puts the `event` field in the first position (if present), and
// sorts all other fields lexicographically.
//
// TODO: it should also do something about the "msg" field, maybe replace it
// with "event" field.
// TODO: we may also want to move "level" field (as in logging level) to an earlier
// place in the list. This adjuster needs some sort of config describing predefined
// field names/types and their relative order.
func SortLogFields() Adjuster {
func SortTagsAndLogFields() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
for _, span := range trace.Spans {
model.KeyValues(span.Tags).Sort()
if span.Process != nil {
model.KeyValues(span.Process.Tags).Sort()
}

for _, log := range span.Logs {
// first move "event" field into the first position
offset := 0
Expand Down
155 changes: 155 additions & 0 deletions model/adjuster/sort_tags_and_log_fields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
)

func TestSortTagsAndLogFieldsDoesSortFields(t *testing.T) {
testCases := []struct {
fields model.KeyValues
expected model.KeyValues
}{
{
fields: model.KeyValues{
model.String("event", "some event"), // event already in the first position, and no other fields
},
expected: model.KeyValues{
model.String("event", "some event"),
},
},
{
fields: model.KeyValues{
model.Int64("event", 42), // non-string event field
model.Int64("a", 41),
},
expected: model.KeyValues{
model.Int64("a", 41),
model.Int64("event", 42),
},
},
{
fields: model.KeyValues{
model.String("nonsense", "42"), // no event field
},
expected: model.KeyValues{
model.String("nonsense", "42"),
},
},
{
fields: model.KeyValues{
model.String("event", "some event"),
model.Int64("a", 41),
},
expected: model.KeyValues{
model.String("event", "some event"),
model.Int64("a", 41),
},
},
{
fields: model.KeyValues{
model.Int64("x", 1),
model.Int64("a", 2),
model.String("event", "some event"),
},
expected: model.KeyValues{
model.String("event", "some event"),
model.Int64("a", 2),
model.Int64("x", 1),
},
},
}
for _, testCase := range testCases {
trace := &model.Trace{
Spans: []*model.Span{
{
Logs: []model.Log{
{
Fields: testCase.fields,
},
},
},
},
}
trace, err := SortTagsAndLogFields().Adjust(trace)
require.NoError(t, err)
assert.Equal(t, testCase.expected, model.KeyValues(trace.Spans[0].Logs[0].Fields))
}
}

func TestSortTagsAndLogFieldsDoesSortTags(t *testing.T) {
testCases := []model.KeyValues{
{
model.String("http.method", "GET"),
model.String("http.url", "http://wikipedia.org"),
model.Int64("http.status_code", 200),
model.String("guid:x-request-id", "f61defd2-7a77-11ef-b54f-4fbb67a6d181"),
},
{
// empty
},
{
model.String("argv", "mkfs.ext4 /dev/sda1"),
},
}
for _, tags := range testCases {
trace := &model.Trace{
Spans: []*model.Span{
{
Tags: tags,
},
},
}
sorted, err := SortTagsAndLogFields().Adjust(trace)
require.NoError(t, err)
assert.ElementsMatch(t, tags, sorted.Spans[0].Tags)
adjustedKeys := make([]string, len(sorted.Spans[0].Tags))
for i, kv := range sorted.Spans[0].Tags {
adjustedKeys[i] = kv.Key
}
assert.IsIncreasing(t, adjustedKeys)
}
}

func TestSortTagsAndLogFieldsDoesSortProcessTags(t *testing.T) {
trace := &model.Trace{
Spans: []*model.Span{
{
Process: &model.Process{
Tags: model.KeyValues{
model.String("process.argv", "mkfs.ext4 /dev/sda1"),
model.Int64("process.pid", 42),
model.Int64("process.uid", 0),
model.String("k8s.node.roles", "control-plane,etcd,master"),
},
},
},
},
}
sorted, err := SortTagsAndLogFields().Adjust(trace)
require.NoError(t, err)
assert.ElementsMatch(t, trace.Spans[0].Process.Tags, sorted.Spans[0].Process.Tags)
adjustedKeys := make([]string, len(sorted.Spans[0].Process.Tags))
for i, kv := range sorted.Spans[0].Process.Tags {
adjustedKeys[i] = kv.Key
}
assert.IsIncreasing(t, adjustedKeys)
}

func TestSortTagsAndLogFieldsHandlesNilProcessTags(t *testing.T) {
trace := &model.Trace{
Spans: []*model.Span{
{},
},
}
_, err := SortTagsAndLogFields().Adjust(trace)
require.NoError(t, err)
}
40 changes: 40 additions & 0 deletions model/adjuster/span_hash_deduper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"github.com/jaegertracing/jaeger/model"
)

// DedupeBySpanHash returns an adjuster that removes all but one span with the same hashcode.
// This is useful for when spans are duplicated in archival storage, as happens with
// ElasticSearch archival.
func DedupeBySpanHash() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanHashDeduper{trace: trace}
deduper.groupSpansByHash()
return deduper.trace, nil
})
}

type spanHashDeduper struct {
trace *model.Trace
spansByHash map[uint64]*model.Span
}

func (d *spanHashDeduper) groupSpansByHash() {
spansByHash := make(map[uint64]*model.Span)
for _, span := range d.trace.Spans {
hash, _ := model.HashCode(span)
if _, ok := spansByHash[hash]; !ok {
spansByHash[hash] = span
}
}
d.spansByHash = spansByHash

d.trace.Spans = nil
for _, span := range d.spansByHash {
d.trace.Spans = append(d.trace.Spans, span)
}
}
Loading

0 comments on commit 7c6ed87

Please sign in to comment.