Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit tags and tag values search #4320

Merged
merged 27 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9f36136
limit tags and tag values search
javiermolinar Nov 13, 2024
573695f
added documentation
javiermolinar Nov 13, 2024
5e43c17
Update docs/sources/tempo/api_docs/_index.md
javiermolinar Nov 26, 2024
70637ce
Update docs/sources/tempo/api_docs/_index.md
javiermolinar Nov 26, 2024
a40ee1a
Update docs/sources/tempo/api_docs/_index.md
javiermolinar Nov 26, 2024
2775ce1
stop when one scope exceed the limit
javiermolinar Nov 26, 2024
ecbcd98
Merge branch 'main' into search-tags-limit
javiermolinar Nov 26, 2024
cd2977a
update proto
javiermolinar Nov 26, 2024
4f24ce9
update documentation
javiermolinar Nov 26, 2024
6f33cba
added maxStaleValues parameter
javiermolinar Nov 27, 2024
d52e494
fix failing test
javiermolinar Nov 27, 2024
581dfd4
fix another failing test
javiermolinar Nov 27, 2024
a51c71e
typo
javiermolinar Nov 27, 2024
674ba0f
inject logger to collectors
javiermolinar Nov 27, 2024
cf9c7a3
use context for instrument logger
javiermolinar Nov 27, 2024
59aeee0
Update docs/sources/tempo/api_docs/_index.md
javiermolinar Nov 28, 2024
d510bd2
Revert "use context for instrument logger"
javiermolinar Nov 28, 2024
5d44f64
Revert "Update docs/sources/tempo/api_docs/_index.md"
javiermolinar Nov 28, 2024
80ae1dc
Reapply "Update docs/sources/tempo/api_docs/_index.md"
javiermolinar Nov 28, 2024
ef319ed
Revert "inject logger to collectors"
javiermolinar Nov 28, 2024
9e603ee
fix proto index
javiermolinar Nov 28, 2024
dd6edcf
fix test
javiermolinar Nov 28, 2024
3a64f9a
update logs
javiermolinar Dec 4, 2024
dc682f3
fix tests
javiermolinar Dec 4, 2024
8c3fc92
pass request from querier to tempodb@
javiermolinar Dec 4, 2024
6c03c10
Merge branch 'main' into search-tags-limit
javiermolinar Dec 5, 2024
ab5f3e0
changelog
javiermolinar Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ Parameters:
Optional. Along with `end`, defines a time range from which tags should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start`, defines a time range from which tags should be returned. Providing both `start` and `end` includes blocks for the specified time range only.
- `limit = (integer)`
Optional. Limits the maximum number of tags values.
- `maxStaleValues = (integer)`
Optional. Limits the search for tags names. If the number of stale (already known) values reaches or exceeds this limit, the search stops.
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved


### Search tags V2
Expand All @@ -385,6 +389,10 @@ Parameters:
Optional. Along with `end` define a time range from which tags should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start` define a time range from which tags should be returned. Providing both `start` and `end` includes blocks for the specified time range only.
- `limit = (integer)`
Optional. Sets the maximum number of tags names allowed per scope. The query stops once this limit is reached for any scope.
- `maxStaleValues = (integer)`
Optional. Limits the search for tag values. If the number of stale (already known) values reaches or exceeds this limit, the search stops.

#### Example

Expand Down Expand Up @@ -515,6 +523,10 @@ Parameters:
Optional. Along with `end`, defines a time range from which tags should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start`, defines a time range from which tags should be returned. Providing both `start` and `end` includes blocks for the specified time range only.
- `limit = (integer)`
Optional. Limits the maximum number of tags values.
- `maxStaleValues = (integer)`
Optional. Limits the search for tag values. If the number of stale (already known) values reaches or exceeds this limit, the search stops.


### Search tag values V2
Expand Down Expand Up @@ -561,7 +573,17 @@ $ curl -G -s http://localhost:3200/api/v2/search/tag/.service.name/values | jq
}
}
```
This endpoint can also receive `start` and `end` optional parameters. These parameters define the time range from which the tags are fetched
Parameters:
- `start = (unix epoch seconds)`
Optional. Along with `end`, defines a time range from which tags values should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start`, defines a time range from which tags values should be returned. Providing both `start` and `end` includes blocks for the specified time range only.
- `q = (traceql query)`
Optional. A TraceQL query to filter tag values by. Currently only works for a single spanset of `&&`ed conditions. For example: `{ span.foo = "bar" && resource.baz = "bat" ...}`. See also [Filtered tag values](#filtered-tag-values).
- `limit = (integer)`
Optional. Limits the maximum number of tags values
- `maxStaleValues = (integer)`
Optional. Limits the search for tags values. If the number of stale (already known) values reaches or exceeds this limit, the search stops.

#### Filtered tag values

Expand Down
8 changes: 5 additions & 3 deletions integration/e2e/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/grafana/e2e"
"github.com/grafana/tempo/integration/util"
Expand Down Expand Up @@ -207,9 +208,10 @@ func assertRequestCountMetric(t *testing.T, s *e2e.HTTPService, route string, re

// getAttrsAndSpanNames returns trace attrs and span names
func getAttrsAndSpanNames(trace *tempopb.Trace) traceStringsMap {
rAttrsKeys := collector.NewDistinctString(0)
rAttrsValues := collector.NewDistinctString(0)
spanNames := collector.NewDistinctString(0)
logger := log.NewNopLogger()
rAttrsKeys := collector.NewDistinctString(0, 0, 0, logger)
rAttrsValues := collector.NewDistinctString(0, 0, 0, logger)
spanNames := collector.NewDistinctString(0, 0, 0, logger)

for _, b := range trace.ResourceSpans {
for _, ss := range b.ScopeSpans {
Expand Down
17 changes: 9 additions & 8 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package combiner

import (
"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/collector"
"github.com/grafana/tempo/pkg/tempopb"
Expand All @@ -12,9 +13,9 @@ var (
_ GRPCCombiner[*tempopb.SearchTagValuesV2Response] = (*genericCombiner[*tempopb.SearchTagValuesV2Response])(nil)
)

func NewSearchTagValues(limitBytes int) Combiner {
func NewSearchTagValues(maxDataBytes int, maxTagsValues uint32, staleValueThreshold uint32, logger log.Logger) Combiner {
// Distinct collector with no limit
d := collector.NewDistinctStringWithDiff(limitBytes)
d := collector.NewDistinctStringWithDiff(maxDataBytes, maxTagsValues, staleValueThreshold, logger)
inspectedBytes := atomic.NewUint64(0)

c := &genericCombiner[*tempopb.SearchTagValuesResponse]{
Expand Down Expand Up @@ -56,13 +57,13 @@ func NewSearchTagValues(limitBytes int) Combiner {
return c
}

func NewTypedSearchTagValues(limitBytes int) GRPCCombiner[*tempopb.SearchTagValuesResponse] {
return NewSearchTagValues(limitBytes).(GRPCCombiner[*tempopb.SearchTagValuesResponse])
func NewTypedSearchTagValues(maxDataBytes int, maxTagsValues uint32, staleValueThreshold uint32, logger log.Logger) GRPCCombiner[*tempopb.SearchTagValuesResponse] {
return NewSearchTagValues(maxDataBytes, maxTagsValues, staleValueThreshold, logger).(GRPCCombiner[*tempopb.SearchTagValuesResponse])
}

func NewSearchTagValuesV2(limitBytes int) Combiner {
func NewSearchTagValuesV2(maxDataBytes int, maxTagsValues uint32, staleValueThreshold uint32, logger log.Logger) Combiner {
// Distinct collector with no limit and diff enabled
d := collector.NewDistinctValueWithDiff(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })
d := collector.NewDistinctValueWithDiff(maxDataBytes, maxTagsValues, staleValueThreshold, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) }, logger)
inspectedBytes := atomic.NewUint64(0)

c := &genericCombiner[*tempopb.SearchTagValuesV2Response]{
Expand Down Expand Up @@ -113,6 +114,6 @@ func NewSearchTagValuesV2(limitBytes int) Combiner {
return c
}

func NewTypedSearchTagValuesV2(limitBytes int) GRPCCombiner[*tempopb.SearchTagValuesV2Response] {
return NewSearchTagValuesV2(limitBytes).(GRPCCombiner[*tempopb.SearchTagValuesV2Response])
func NewTypedSearchTagValuesV2(maxDataBytes int, maxTagsValues uint32, staleValueThreshold uint32, logger log.Logger) GRPCCombiner[*tempopb.SearchTagValuesV2Response] {
return NewSearchTagValuesV2(maxDataBytes, maxTagsValues, staleValueThreshold, logger).(GRPCCombiner[*tempopb.SearchTagValuesV2Response])
}
17 changes: 9 additions & 8 deletions modules/frontend/combiner/search_tags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package combiner

import (
"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/collector"
"github.com/grafana/tempo/pkg/tempopb"
Expand All @@ -12,8 +13,8 @@ var (
_ GRPCCombiner[*tempopb.SearchTagsV2Response] = (*genericCombiner[*tempopb.SearchTagsV2Response])(nil)
)

func NewSearchTags(limitBytes int) Combiner {
d := collector.NewDistinctStringWithDiff(limitBytes)
func NewSearchTags(maxDataBytes int, maxTagsPerScope uint32, staleValueThreshold uint32, logger log.Logger) Combiner {
d := collector.NewDistinctStringWithDiff(maxDataBytes, maxTagsPerScope, staleValueThreshold, logger)
inspectedBytes := atomic.NewUint64(0)

c := &genericCombiner[*tempopb.SearchTagsResponse]{
Expand Down Expand Up @@ -56,13 +57,13 @@ func NewSearchTags(limitBytes int) Combiner {
return c
}

func NewTypedSearchTags(limitBytes int) GRPCCombiner[*tempopb.SearchTagsResponse] {
return NewSearchTags(limitBytes).(GRPCCombiner[*tempopb.SearchTagsResponse])
func NewTypedSearchTags(maxDataBytes int, maxTagsPerScope uint32, staleValueThreshold uint32, logger log.Logger) GRPCCombiner[*tempopb.SearchTagsResponse] {
return NewSearchTags(maxDataBytes, maxTagsPerScope, staleValueThreshold, logger).(GRPCCombiner[*tempopb.SearchTagsResponse])
}

func NewSearchTagsV2(limitBytes int) Combiner {
func NewSearchTagsV2(maxDataBytes int, maxTagsPerScope uint32, staleValueThreshold uint32, logger log.Logger) Combiner {
// Distinct collector map to collect scopes and scope values
distinctValues := collector.NewScopedDistinctStringWithDiff(limitBytes)
distinctValues := collector.NewScopedDistinctStringWithDiff(maxDataBytes, maxTagsPerScope, staleValueThreshold, logger)
inspectedBytes := atomic.NewUint64(0)

c := &genericCombiner[*tempopb.SearchTagsV2Response]{
Expand Down Expand Up @@ -121,6 +122,6 @@ func NewSearchTagsV2(limitBytes int) Combiner {
return c
}

func NewTypedSearchTagsV2(limitBytes int) GRPCCombiner[*tempopb.SearchTagsV2Response] {
return NewSearchTagsV2(limitBytes).(GRPCCombiner[*tempopb.SearchTagsV2Response])
func NewTypedSearchTagsV2(maxDataBytes int, maxTagsPerScope uint32, staleValueThreshold uint32, logger log.Logger) GRPCCombiner[*tempopb.SearchTagsV2Response] {
return NewSearchTagsV2(maxDataBytes, maxTagsPerScope, staleValueThreshold, logger).(GRPCCombiner[*tempopb.SearchTagsV2Response])
}
62 changes: 43 additions & 19 deletions modules/frontend/combiner/search_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sort"
"testing"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/assert"
Expand All @@ -13,8 +14,10 @@ import (
func TestTagsCombiner(t *testing.T) {
tests := []struct {
name string
factory func(int) Combiner
limit int
factory func(int, uint32, uint32, log.Logger) Combiner
limitBytes int
maxTagsValues uint32
maxCacheHits uint32
result1 proto.Message
result2 proto.Message
expectedResult proto.Message
Expand All @@ -31,7 +34,7 @@ func TestTagsCombiner(t *testing.T) {
expectedResult: &tempopb.SearchTagsResponse{TagNames: []string{"tag1", "tag2", "tag3"}, Metrics: &tempopb.MetadataMetrics{}},
actualResult: &tempopb.SearchTagsResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagsResponse).TagNames) },
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagsV2",
Expand All @@ -49,7 +52,7 @@ func TestTagsCombiner(t *testing.T) {
return scopes[i].Name < scopes[j].Name
})
},
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagValues",
Expand All @@ -59,7 +62,7 @@ func TestTagsCombiner(t *testing.T) {
expectedResult: &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1", "tag2", "tag3"}, Metrics: &tempopb.MetadataMetrics{}},
actualResult: &tempopb.SearchTagValuesResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagValuesResponse).TagValues) },
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagValuesV2",
Expand All @@ -73,7 +76,7 @@ func TestTagsCombiner(t *testing.T) {
return m.(*tempopb.SearchTagValuesV2Response).TagValues[i].Value < m.(*tempopb.SearchTagValuesV2Response).TagValues[j].Value
})
},
limit: 100,
limitBytes: 100,
},
// limits
{
Expand All @@ -85,7 +88,7 @@ func TestTagsCombiner(t *testing.T) {
actualResult: &tempopb.SearchTagsResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagsResponse).TagNames) },
expectedShouldQuit: true,
limit: 5,
limitBytes: 5,
},
{
name: "SearchTagsV2 - limited",
Expand All @@ -104,7 +107,7 @@ func TestTagsCombiner(t *testing.T) {
})
},
expectedShouldQuit: true,
limit: 2,
limitBytes: 2,
},
{
name: "SearchTagValues - limited",
Expand All @@ -115,7 +118,7 @@ func TestTagsCombiner(t *testing.T) {
actualResult: &tempopb.SearchTagValuesResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagValuesResponse).TagValues) },
expectedShouldQuit: true,
limit: 5,
limitBytes: 5,
},
{
name: "SearchTagValuesV2 - limited",
Expand All @@ -130,7 +133,22 @@ func TestTagsCombiner(t *testing.T) {
})
},
expectedShouldQuit: true,
limit: 10,
limitBytes: 10,
},
{
name: "SearchTagValuesV2 - max values limited",
factory: NewSearchTagValuesV2,
result1: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}}},
result2: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v2", Type: "string"}, {Value: "v3", Type: "string"}}},
expectedResult: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}}, Metrics: &tempopb.MetadataMetrics{}},
actualResult: &tempopb.SearchTagValuesV2Response{},
sort: func(m proto.Message) {
sort.Slice(m.(*tempopb.SearchTagValuesV2Response).TagValues, func(i, j int) bool {
return m.(*tempopb.SearchTagValuesV2Response).TagValues[i].Value < m.(*tempopb.SearchTagValuesV2Response).TagValues[j].Value
})
},
expectedShouldQuit: true,
maxTagsValues: 1,
},
// with metrics
{
Expand All @@ -141,7 +159,7 @@ func TestTagsCombiner(t *testing.T) {
expectedResult: &tempopb.SearchTagsResponse{TagNames: []string{"tag1", "tag2", "tag3"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 2}},
actualResult: &tempopb.SearchTagsResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagsResponse).TagNames) },
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagsV2 - metrics",
Expand All @@ -159,7 +177,7 @@ func TestTagsCombiner(t *testing.T) {
return scopes[i].Name < scopes[j].Name
})
},
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagValues - metrics",
Expand All @@ -169,7 +187,7 @@ func TestTagsCombiner(t *testing.T) {
expectedResult: &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1", "tag2", "tag3"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 2}},
actualResult: &tempopb.SearchTagValuesResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagValuesResponse).TagValues) },
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagValuesV2 - metrics",
Expand All @@ -183,12 +201,14 @@ func TestTagsCombiner(t *testing.T) {
return m.(*tempopb.SearchTagValuesV2Response).TagValues[i].Value < m.(*tempopb.SearchTagValuesV2Response).TagValues[j].Value
})
},
limit: 100,
limitBytes: 100,
},
}
logger := log.NewNopLogger()

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
combiner := tc.factory(tc.limit)
combiner := tc.factory(tc.limitBytes, tc.maxTagsValues, tc.maxCacheHits, logger)

err := combiner.AddResponse(toHTTPResponse(t, tc.result1, 200))
assert.NoError(t, err)
Expand Down Expand Up @@ -228,7 +248,8 @@ func metrics(message proto.Message) *tempopb.MetadataMetrics {
}

func TestTagsGRPCCombiner(t *testing.T) {
c := NewTypedSearchTags(0)
logger := log.NewNopLogger()
c := NewTypedSearchTags(0, 0, 0, logger)
res1 := &tempopb.SearchTagsResponse{TagNames: []string{"tag1"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
res2 := &tempopb.SearchTagsResponse{TagNames: []string{"tag1", "tag2"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
diff1 := &tempopb.SearchTagsResponse{TagNames: []string{"tag1"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
Expand All @@ -238,7 +259,8 @@ func TestTagsGRPCCombiner(t *testing.T) {
}

func TestTagsV2GRPCCombiner(t *testing.T) {
c := NewTypedSearchTagsV2(0)
logger := log.NewNopLogger()
c := NewTypedSearchTagsV2(0, 0, 0, logger)
res1 := &tempopb.SearchTagsV2Response{Scopes: []*tempopb.SearchTagsV2Scope{{Name: "scope1", Tags: []string{"tag1"}}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
res2 := &tempopb.SearchTagsV2Response{Scopes: []*tempopb.SearchTagsV2Scope{{Name: "scope1", Tags: []string{"tag1", "tag2"}}, {Name: "scope2", Tags: []string{"tag3"}}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
diff1 := &tempopb.SearchTagsV2Response{Scopes: []*tempopb.SearchTagsV2Scope{{Name: "scope1", Tags: []string{"tag1"}}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
Expand All @@ -255,7 +277,8 @@ func TestTagsV2GRPCCombiner(t *testing.T) {
}

func TestTagValuesGRPCCombiner(t *testing.T) {
c := NewTypedSearchTagValues(0)
logger := log.NewNopLogger()
c := NewTypedSearchTagValues(0, 0, 0, logger)
res1 := &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
res2 := &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1", "tag2"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
diff1 := &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
Expand All @@ -265,7 +288,8 @@ func TestTagValuesGRPCCombiner(t *testing.T) {
}

func TestTagValuesV2GRPCCombiner(t *testing.T) {
c := NewTypedSearchTagValuesV2(0)
logger := log.NewNopLogger()
c := NewTypedSearchTagValuesV2(0, 0, 0, logger)
res1 := &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
res2 := &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}, {Value: "v2", Type: "string"}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
diff1 := &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
Expand Down
Loading