Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap mutex around combiner now that results are processed in parallel #4153

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/model/trace/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"sync"

"github.com/grafana/tempo/pkg/tempopb"
)
Expand Down Expand Up @@ -41,6 +42,7 @@ var ErrTraceTooLarge = fmt.Errorf("trace exceeds max size")
// * Only sort the final result once and if needed.
// * Don't scan/hash the spans for the last input (final=true).
type Combiner struct {
mtx sync.Mutex
result *tempopb.Trace
spans map[token]struct{}
combined bool
Expand All @@ -53,6 +55,7 @@ type Combiner struct {
// when allowPartialTrace is set to true a partial trace that exceed the max size may be returned
func NewCombiner(maxSizeBytes int, allowPartialTrace bool) *Combiner {
return &Combiner{
mtx: sync.Mutex{},
maxSizeBytes: maxSizeBytes,
allowPartialTrace: allowPartialTrace,
}
Expand All @@ -66,6 +69,9 @@ func (c *Combiner) Consume(tr *tempopb.Trace) (int, error) {
// ConsumeWithFinal consumes the trace, but allows for performance savings when
// it is known that this is the last expected input trace.
func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error) {
c.mtx.Lock()
defer c.mtx.Unlock()

var spanCount int
if tr == nil || c.IsPartialTrace() {
return spanCount, nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/model/trace/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sort"
"strconv"
"sync"
"testing"

"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -104,6 +105,23 @@ func TestCombinerReturnsAPartialTrace(t *testing.T) {
}
}

func TestCombinerParallel(t *testing.T) {
// Ensure that the combiner is safe for parallel use.
c := NewCombiner(0, false)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
_, err := c.Consume(test.MakeTraceWithSpanCount(1, 1, []byte{0x01}))
require.NoError(t, err)
}
}()
}
wg.Wait()
}

func TestTokenForIDCollision(t *testing.T) {
// Estimate the hash collision rate of tokenForID.

Expand Down