Skip to content

Commit

Permalink
Wrap mutex around combiner now that results are processed in parallel (
Browse files Browse the repository at this point in the history
…#4153) (#4154)

* Wrap mutex around combiner now that results are processed in parallel

* Add test for parallel combining

(cherry picked from commit 90e9b11)

Co-authored-by: Zach Leslie <zach.leslie@grafana.com>
  • Loading branch information
github-actions[bot] and zalegrala authored Oct 3, 2024
1 parent f852284 commit 69af9ca
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/model/trace/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"sync"

"github.com/grafana/tempo/pkg/tempopb"
)
Expand Down Expand Up @@ -41,6 +42,7 @@ var ErrTraceTooLarge = fmt.Errorf("trace exceeds max size")
// * Only sort the final result once and if needed.
// * Don't scan/hash the spans for the last input (final=true).
type Combiner struct {
mtx sync.Mutex
result *tempopb.Trace
spans map[token]struct{}
combined bool
Expand All @@ -53,6 +55,7 @@ type Combiner struct {
// when allowPartialTrace is set to true a partial trace that exceed the max size may be returned
func NewCombiner(maxSizeBytes int, allowPartialTrace bool) *Combiner {
return &Combiner{
mtx: sync.Mutex{},
maxSizeBytes: maxSizeBytes,
allowPartialTrace: allowPartialTrace,
}
Expand All @@ -66,6 +69,9 @@ func (c *Combiner) Consume(tr *tempopb.Trace) (int, error) {
// ConsumeWithFinal consumes the trace, but allows for performance savings when
// it is known that this is the last expected input trace.
func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error) {
c.mtx.Lock()
defer c.mtx.Unlock()

var spanCount int
if tr == nil || c.IsPartialTrace() {
return spanCount, nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/model/trace/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sort"
"strconv"
"sync"
"testing"

"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -104,6 +105,23 @@ func TestCombinerReturnsAPartialTrace(t *testing.T) {
}
}

func TestCombinerParallel(t *testing.T) {
// Ensure that the combiner is safe for parallel use.
c := NewCombiner(0, false)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
_, err := c.Consume(test.MakeTraceWithSpanCount(1, 1, []byte{0x01}))
require.NoError(t, err)
}
}()
}
wg.Wait()
}

func TestTokenForIDCollision(t *testing.T) {
// Estimate the hash collision rate of tokenForID.

Expand Down

0 comments on commit 69af9ca

Please sign in to comment.