Skip to content

Commit

Permalink
Generator performance (#4232)
Browse files Browse the repository at this point in the history
* todos

* more todos and print inuse stats

* Benchmark report heapinuse, ensure cleanup between benchmarks

* Improve memory usage by changing histograms to precompute all labels for all sub-series instead of during each collection

* changelog
  • Loading branch information
mdisibio authored Oct 25, 2024
1 parent de5cb00 commit 2f2a35e
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott)
* [ENHANCEMENT] Improve performance of TraceQL queries [#4114](https://github.com/grafana/tempo/pull/4114) (@mdisibio)
* [ENHANCEMENT] Improve performance of TraceQL queries [#4163](https://github.com/grafana/tempo/pull/4163) (@mdisibio)
* [ENHANCEMENT] Reduce memory usage of classic histograms in the span-metrics and service-graphs processors [#4232](https://github.com/grafana/tempo/pull/4232) (@mdisibio)
* [ENHANCEMENT] Implement simple Fetch by key for cache items [#4032](https://github.com/grafana/tempo/pull/4032) (@javiermolinar)
* [ENHANCEMENT] Replace Grafana Agent example by Grafana Alloy[#4030](https://github.com/grafana/tempo/pull/4030) (@javiermolinar)
* [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter. [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr)
Expand Down
145 changes: 145 additions & 0 deletions modules/generator/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@ package generator

import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"runtime"
"strconv"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
"github.com/grafana/tempo/modules/generator/storage"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/tempopb"
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
trace_v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -129,3 +137,140 @@ func (l testLogger) Log(keyvals ...interface{}) error {
l.t.Log(keyvals...)
return nil
}

func BenchmarkPushSpans(b *testing.B) {
var (
tenant = "test-tenant"
reg = prometheus.NewRegistry()
ctx = context.Background()
log = log.NewNopLogger()
cfg = &Config{}

walcfg = &storage.Config{
Path: b.TempDir(),
}

o = &mockOverrides{
processors: map[string]struct{}{
"span-metrics": {},
"service-graphs": {},
},
spanMetricsEnableTargetInfo: true,
spanMetricsTargetInfoExcludedDimensions: []string{"excluded}"},
}
)

cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})

wal, err := storage.New(walcfg, o, tenant, reg, log)
require.NoError(b, err)

inst, err := newInstance(cfg, tenant, o, wal, reg, log, nil, nil)
require.NoError(b, err)
defer inst.shutdown()

req := &tempopb.PushSpansRequest{
Batches: []*trace_v1.ResourceSpans{
test.MakeBatch(100, nil),
test.MakeBatch(100, nil),
test.MakeBatch(100, nil),
test.MakeBatch(100, nil),
},
}

// Add more resource attributes to get closer to real data
// Add integer to increase cardinality.
// Currently this is about 80 active series
// TODO - Get more series
for i, b := range req.Batches {
b.Resource.Attributes = append(b.Resource.Attributes, []*common_v1.KeyValue{
{Key: "k8s.cluster.name", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "k8s.namespace.name", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "k8s.node.name", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "k8s.pod.ip", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "k8s.pod.name", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "excluded", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
}...)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
inst.pushSpans(ctx, req)
}

b.StopTimer()
runtime.GC()
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
b.ReportMetric(float64(mem.HeapInuse), "heap_in_use")
}

func BenchmarkCollect(b *testing.B) {
var (
tenant = "test-tenant"
reg = prometheus.NewRegistry()
ctx = context.Background()
log = log.NewNopLogger()
cfg = &Config{}

walcfg = &storage.Config{
Path: b.TempDir(),
}

o = &mockOverrides{
processors: map[string]struct{}{
"span-metrics": {},
"service-graphs": {},
},
spanMetricsDimensions: []string{"k8s.cluster.name", "k8s.namespace.name"},
spanMetricsEnableTargetInfo: true,
spanMetricsTargetInfoExcludedDimensions: []string{"excluded}"},
// nativeHistograms: overrides.HistogramMethodBoth,
}
)

cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})

wal, err := storage.New(walcfg, o, tenant, reg, log)
require.NoError(b, err)

inst, err := newInstance(cfg, tenant, o, wal, reg, log, nil, nil)
require.NoError(b, err)
defer inst.shutdown()

req := &tempopb.PushSpansRequest{
Batches: []*trace_v1.ResourceSpans{
test.MakeBatch(100, nil),
test.MakeBatch(100, nil),
test.MakeBatch(100, nil),
test.MakeBatch(100, nil),
},
}

// Add more resource attributes to get closer to real data
// Add integer to increase cardinality.
// Currently this is about 80 active series
// TODO - Get more series
for i, b := range req.Batches {
b.Resource.Attributes = append(b.Resource.Attributes, []*common_v1.KeyValue{
{Key: "k8s.cluster.name", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "k8s.namespace.name", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "k8s.node.name", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "k8s.pod.ip", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "k8s.pod.name", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
{Key: "excluded", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}},
}...)
}
inst.pushSpans(ctx, req)

b.ResetTimer()
for i := 0; i < b.N; i++ {
inst.registry.CollectMetrics(ctx)
}

b.StopTimer()
runtime.GC()
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
b.ReportMetric(float64(mem.HeapInuse), "heap_in_use")
}
2 changes: 2 additions & 0 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest)
}

func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) {
// TODO - uniqify all strings?
// Doesn't help allocs, but should greatly reduce inuse space
size := 0
spanCount := 0
expiredSpanCount := 0
Expand Down
8 changes: 6 additions & 2 deletions modules/generator/processor/spanmetrics/spanmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (p *Processor) aggregateMetrics(resourceSpans []*v1_trace.ResourceSpans) {
svcName, _ := processor_util.FindServiceName(rs.Resource.Attributes)
jobName := processor_util.GetJobValue(rs.Resource.Attributes)
instanceID, _ := processor_util.FindInstanceID(rs.Resource.Attributes)
resourceLabels := make([]string, 0)
resourceValues := make([]string, 0)
resourceLabels := make([]string, 0) // TODO move outside the loop and reuse?
resourceValues := make([]string, 0) // TODO don't allocate unless needed?

if p.Cfg.EnableTargetInfo {
resourceLabels, resourceValues = processor_util.GetTargetInfoAttributesValues(rs.Resource.Attributes, p.Cfg.TargetInfoExcludedDimensions)
Expand Down Expand Up @@ -219,6 +219,9 @@ func (p *Processor) aggregateMetricsForSpan(svcName string, jobName string, inst

// update target_info label values
if p.Cfg.EnableTargetInfo {
// TODO - The resource labels only need to be sanitized once
// TODO - attribute names are stable across applications
// so let's cache the result of previous sanitizations
resourceAttributesCount := len(targetInfoLabels)
for index, label := range targetInfoLabels {
// sanitize label name
Expand All @@ -239,6 +242,7 @@ func (p *Processor) aggregateMetricsForSpan(svcName string, jobName string, inst
targetInfoRegistryLabelValues := p.registry.NewLabelValueCombo(targetInfoLabels, targetInfoLabelValues)

// only register target info if at least (job or instance) AND one other attribute are present
// TODO - We can move this check to the top
if resourceAttributesCount > 0 && len(targetInfoLabels) > resourceAttributesCount {
p.spanMetricsTargetInfo.SetForTargetInfo(targetInfoRegistryLabelValues, 1)
}
Expand Down
1 change: 1 addition & 0 deletions modules/generator/processor/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func GetJobValue(attributes []*v1_common.KeyValue) string {
}

func GetTargetInfoAttributesValues(attributes []*v1_common.KeyValue, exclude []string) ([]string, []string) {
// TODO allocate with known length, or take new params for existing buffers
keys := make([]string, 0)
values := make([]string, 0)
for _, attrs := range attributes {
Expand Down
Loading

0 comments on commit 2f2a35e

Please sign in to comment.