Skip to content

Commit

Permalink
Ingester memory improvements by adjusting prealloc (#4344)
Browse files Browse the repository at this point in the history
* remove trace ids

Signed-off-by: Joe Elliott <number101010@gmail.com>

* linear buckets

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* tuney tune

Signed-off-by: Joe Elliott <number101010@gmail.com>

* metric misses and increase pool size

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott authored Nov 19, 2024
1 parent e6d1e07 commit f71c4c6
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 321 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* [ENHANCEMENT] Chore: delete spanlogger. [4312](https://github.com/grafana/tempo/pull/4312) (@javiermolinar)
* [ENHANCEMENT] Add `invalid_utf8` to reasons spanmetrics will discard spans. [#4293](https://github.com/grafana/tempo/pull/4293) (@zalegrala)
* [ENHANCEMENT] Reduce frontend and querier allocations by dropping HTTP headers early in the pipeline. [#4298](https://github.com/grafana/tempo/pull/4298) (@joe-elliott)
* [ENHANCEMENT] Reduce ingester working set by improving prelloc behavior. [#4344](https://github.com/grafana/tempo/pull/4344) (@joe-elliott)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
Expand Down
7 changes: 3 additions & 4 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,13 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string
localCtx = user.InjectOrgID(localCtx, userID)

req := tempopb.PushBytesRequest{
Traces: make([]tempopb.PreallocBytes, len(indexes)),
Ids: make([]tempopb.PreallocBytes, len(indexes)),
SearchData: nil, // support for flatbuffer/v2 search has been removed. todo: cleanup the proto
Traces: make([]tempopb.PreallocBytes, len(indexes)),
Ids: make([][]byte, len(indexes)),
}

for i, j := range indexes {
req.Traces[i].Slice = marshalledTraces[j][0:]
req.Ids[i].Slice = traces[j].id
req.Ids[i] = traces[j].id
}

c, err := d.pool.GetClientFor(ingester.Addr)
Expand Down
6 changes: 2 additions & 4 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,8 @@ func pushBatchV1(t testing.TB, i *Ingester, batch *v1.ResourceSpans, id []byte)
Slice: buffer,
},
},
Ids: []tempopb.PreallocBytes{
{
Slice: id,
},
Ids: [][]byte{
id,
},
})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (i *instance) PushBytesRequest(ctx context.Context, req *tempopb.PushBytesR
pr := &tempopb.PushResponse{}

for j := range req.Traces {
err := i.PushBytes(ctx, req.Ids[j].Slice, req.Traces[j].Slice)
err := i.PushBytes(ctx, req.Ids[j], req.Traces[j].Slice)
pr.ErrorsByTrace = i.addTraceError(pr.ErrorsByTrace, err, len(req.Traces), j)
}

Expand Down
14 changes: 6 additions & 8 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func BenchmarkInstancePush(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Rotate trace ID
binary.LittleEndian.PutUint32(request.Ids[0].Slice, uint32(i))
binary.LittleEndian.PutUint32(request.Ids[0], uint32(i))
response := instance.PushBytesRequest(context.Background(), request)
errored, _, _ := CheckPushBytesError(response)
require.False(b, errored, "push failed: %w", response.ErrorsByTrace)
Expand Down Expand Up @@ -825,9 +825,9 @@ func makePushBytesRequest(traceID []byte, batch *v1_trace.ResourceSpans) *tempop
}

return &tempopb.PushBytesRequest{
Ids: []tempopb.PreallocBytes{{
Slice: traceID,
}},
Ids: [][]byte{
traceID,
},
Traces: []tempopb.PreallocBytes{{
Slice: buffer,
}},
Expand Down Expand Up @@ -965,7 +965,7 @@ func makePushBytesRequestMultiTraces(traceIDs [][]byte, maxBytes []int) *tempopb
}
traces := makeTraces(batches)

byteIDs := make([]tempopb.PreallocBytes, 0, len(traceIDs))
byteIDs := make([][]byte, 0, len(traceIDs))
byteTraces := make([]tempopb.PreallocBytes, 0, len(traceIDs))

for index, id := range traceIDs {
Expand All @@ -974,9 +974,7 @@ func makePushBytesRequestMultiTraces(traceIDs [][]byte, maxBytes []int) *tempopb
panic(err)
}

byteIDs = append(byteIDs, tempopb.PreallocBytes{
Slice: id,
})
byteIDs = append(byteIDs, id)
byteTraces = append(byteTraces, tempopb.PreallocBytes{
Slice: buffer,
})
Expand Down
83 changes: 49 additions & 34 deletions pkg/tempopb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,42 @@ package pool

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Pool is a bucketed pool for variably sized byte slices.
var metricAllocOutPool = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo",
Name: "ingester_prealloc_miss_bytes_total",
Help: "The total number of alloc'ed bytes that missed the sync pools.",
})

// Pool is a linearly bucketed pool for variably sized byte slices.
type Pool struct {
buckets []sync.Pool
sizes []int
bktSize int
// make is the function used to create an empty slice when none exist yet.
make func(int) []byte
}

// New returns a new Pool with size buckets for minSize to maxSize
// increasing by the given factor.
func New(minSize, maxSize int, factor float64, makeFunc func(int) []byte) *Pool {
if minSize < 1 {
panic("invalid minimum pool size")
}
func New(maxSize, bktSize int, makeFunc func(int) []byte) *Pool {
if maxSize < 1 {
panic("invalid maximum pool size")
}
if factor < 1 {
if bktSize < 1 {
panic("invalid factor")
}

var sizes []int

for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
if maxSize%bktSize != 0 {
panic("invalid bucket size")
}

bkts := maxSize / bktSize

p := &Pool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
buckets: make([]sync.Pool, bkts),
bktSize: bktSize,
make: makeFunc,
}

Expand All @@ -46,29 +50,40 @@ func New(minSize, maxSize int, factor float64, makeFunc func(int) []byte) *Pool

// Get returns a new byte slices that fits the given size.
func (p *Pool) Get(sz int) []byte {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
b := p.buckets[i].Get()
if b == nil {
b = p.make(bktSize)
}
return b.([]byte)
if sz < 0 {
sz = 0 // just panic?
}
return p.make(sz)

// Find the right bucket.
bkt := sz / p.bktSize

if bkt >= len(p.buckets) {
metricAllocOutPool.Add(float64(sz)) // track the number of bytes alloc'ed outside the pool for future tuning
return p.make(sz)
}

b := p.buckets[bkt].Get()
if b == nil {
sz := (bkt + 1) * p.bktSize
b = p.make(sz)
}
return b.([]byte)
}

// Put adds a slice to the right bucket in the pool. This method has been adjusted from its initial
// implementation to ignore byte slices that dont have the correct size
// Put adds a slice to the right bucket in the pool.
func (p *Pool) Put(s []byte) {
c := cap(s)
for i, size := range p.sizes {
if c == size {
p.buckets[i].Put(s) // nolint: staticcheck
}
if c < size {
return
}

if c%p.bktSize != 0 {
return
}
bkt := (c / p.bktSize) - 1
if bkt < 0 {
return
}
if bkt >= len(p.buckets) {
return
}

p.buckets[bkt].Put(s) // nolint: staticcheck
}
20 changes: 15 additions & 5 deletions pkg/tempopb/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,30 @@ func makeFunc(size int) []byte {
}

func TestPool(t *testing.T) {
testPool := New(1, 8, 2, makeFunc)
testPool := New(20, 4, makeFunc)
cases := []struct {
size int
expectedCap int
}{
{
size: -1,
expectedCap: 1,
size: -5,
expectedCap: 4,
},
{
size: 0,
expectedCap: 4,
},
{
size: 3,
expectedCap: 4,
},
{
size: 10,
expectedCap: 10,
expectedCap: 12,
},
{
size: 23,
expectedCap: 23,
},
}
for _, c := range cases {
Expand All @@ -40,7 +48,7 @@ func TestPool(t *testing.T) {
}

func TestPoolSlicesAreAlwaysLargeEnough(t *testing.T) {
testPool := New(1, 1024, 2, makeFunc)
testPool := New(1025, 5, makeFunc)

for i := 0; i < 10000; i++ {
size := rand.Intn(1000)
Expand All @@ -51,5 +59,7 @@ func TestPoolSlicesAreAlwaysLargeEnough(t *testing.T) {
ret := testPool.Get(size)

require.True(t, cap(ret) >= size)

testPool.Put(ret)
}
}
3 changes: 1 addition & 2 deletions pkg/tempopb/prealloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package tempopb

import "github.com/grafana/tempo/pkg/tempopb/pool"

// buckets: [0.5KiB, 1KiB, 2KiB, 4KiB, 8KiB, 16KiB] ...
var bytePool = pool.New(500, 64_000, 2, func(size int) []byte { return make([]byte, 0, size) })
var bytePool = pool.New(100_000, 400, func(size int) []byte { return make([]byte, 0, size) })

// PreallocBytes is a (repeated bytes slices) which preallocs slices on Unmarshal.
type PreallocBytes struct {
Expand Down
Loading

0 comments on commit f71c4c6

Please sign in to comment.