Skip to content

Commit

Permalink
feat: limit tags and tag values search (#4320)
Browse files Browse the repository at this point in the history
This PR introduces two new optional parameters to enhance the control of Tags and Tag Values lookups:

limit:
Specifies the maximum number of items to retrieve. For the api/v2/tags endpoint, the search stops if any scope reaches this limit.

maxStaleValues:
Restricts the search for tag values. If the number of stale (already known) values meets or exceeds this limit, the search is halted. This is useful when we have a small set of tag values.


---------

Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>
Co-authored-by: Joe Elliott <joe.elliott@grafana.com>
  • Loading branch information
3 people authored Dec 5, 2024
1 parent df6fe37 commit 6c9dc98
Show file tree
Hide file tree
Showing 30 changed files with 1,069 additions and 517 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
## main / unreleased
* [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk)

* [CHANGE] **BREAKING CHANGE** Add maximum spans per span set. Users can set `max_spans_per_span_set` to 0 to obtain the old behavior. [#4275](https://github.com/grafana/tempo/pull/4383) (@carles-grafana)
* [CHANGE] slo: include request cancellations within SLO [#4355] (https://github.com/grafana/tempo/pull/4355) (@electron0zero)
request cancellations are exposed under `result` label in `tempo_query_frontend_queries_total` and `tempo_query_frontend_queries_within_slo_total` with `completed` or `canceled` values to differentiate between completed and canceled requests.
Expand All @@ -22,10 +22,12 @@
* [CHANGE] Return 422 for TRACE_TOO_LARGE queries [#4160](https://github.com/grafana/tempo/pull/4160) (@zalegrala)
* [CHANGE] Upgrade OTEL sdk to reduce allocs [#4243](https://github.com/grafana/tempo/pull/4243) (@joe-elliott)
* [CHANGE] Tighten file permissions [#4251](https://github.com/grafana/tempo/pull/4251) (@zalegrala)
* [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk)
* [FEATURE] Discarded span logging `log_discarded_spans` [#3957](https://github.com/grafana/tempo/issues/3957) (@dastrobu)
* [FEATURE] TraceQL support for instrumentation scope [#3967](https://github.com/grafana/tempo/pull/3967) (@ie-pham)
* [FEATURE] Export cost attribution usage metrics from distributor [#4162](https://github.com/grafana/tempo/pull/4162) (@mdisibio)
* [FEATURE] TraceQL metrics: avg_over_time [#4073](https://github.com/grafana/tempo/pull/4073) (@javiermolinar)
* [FEATURE] Limit tags and tag values search [#4320](https://github.com/grafana/tempo/pull/4320) (@javiermolinar)
* [ENHANCEMENT] distributor: return trace id length when it is invalid [#4407](https://github.com/grafana/tempo/pull/4407) (@carles-grafana)
* [ENHANCEMENT] Update to the latest dskit [#4341](https://github.com/grafana/tempo/pull/4341) (@dastrobu)
* [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137)
Expand Down
24 changes: 23 additions & 1 deletion docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ Parameters:
Optional. Along with `end`, defines a time range from which tags should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start`, defines a time range from which tags should be returned. Providing both `start` and `end` includes blocks for the specified time range only.
- `limit = (integer)`
Optional. Limits the maximum number of tags values.
- `maxStaleValues = (integer)`
Optional. Limits the search for tags names. If the number of stale (already known) values reaches or exceeds this limit, the search stops. i.e. If Tempo processes `maxStaleValues` matches without finding a new tag name, the search is returned early.
### Search tags V2
Expand All @@ -385,6 +389,10 @@ Parameters:
Optional. Along with `end` define a time range from which tags should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start` define a time range from which tags should be returned. Providing both `start` and `end` includes blocks for the specified time range only.
- `limit = (integer)`
Optional. Sets the maximum number of tags names allowed per scope. The query stops once this limit is reached for any scope.
- `maxStaleValues = (integer)`
Optional. Limits the search for tag values. If the number of stale (already known) values reaches or exceeds this limit, the search stops.
#### Example
Expand Down Expand Up @@ -515,6 +523,10 @@ Parameters:
Optional. Along with `end`, defines a time range from which tags should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start`, defines a time range from which tags should be returned. Providing both `start` and `end` includes blocks for the specified time range only.
- `limit = (integer)`
Optional. Limits the maximum number of tags values.
- `maxStaleValues = (integer)`
Optional. Limits the search for tags values. If the number of stale (already known) values reaches or exceeds this limit, the search stops. i.e. If Tempo processes `maxStaleValues` matches without finding a new tag name, the search is returned early.
### Search tag values V2
Expand Down Expand Up @@ -561,7 +573,17 @@ $ curl -G -s http://localhost:3200/api/v2/search/tag/.service.name/values | jq
}
}
```
This endpoint can also receive `start` and `end` optional parameters. These parameters define the time range from which the tags are fetched
Parameters:
- `start = (unix epoch seconds)`
Optional. Along with `end`, defines a time range from which tags values should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start`, defines a time range from which tags values should be returned. Providing both `start` and `end` includes blocks for the specified time range only.
- `q = (traceql query)`
Optional. A TraceQL query to filter tag values by. Currently only works for a single spanset of `&&`ed conditions. For example: `{ span.foo = "bar" && resource.baz = "bat" ...}`. See also [Filtered tag values](#filtered-tag-values).
- `limit = (integer)`
Optional. Limits the maximum number of tags values
- `maxStaleValues = (integer)`
Optional. Limits the search for tags values. If the number of stale (already known) values reaches or exceeds this limit, the search stops. i.e. If Tempo processes `maxStaleValues` matches without finding a new tag name, the search is returned early.
#### Filtered tag values
Expand Down
6 changes: 3 additions & 3 deletions integration/e2e/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ func assertRequestCountMetric(t *testing.T, s *e2e.HTTPService, route string, re

// getAttrsAndSpanNames returns trace attrs and span names
func getAttrsAndSpanNames(trace *tempopb.Trace) traceStringsMap {
rAttrsKeys := collector.NewDistinctString(0)
rAttrsValues := collector.NewDistinctString(0)
spanNames := collector.NewDistinctString(0)
rAttrsKeys := collector.NewDistinctString(0, 0, 0)
rAttrsValues := collector.NewDistinctString(0, 0, 0)
spanNames := collector.NewDistinctString(0, 0, 0)

for _, b := range trace.ResourceSpans {
for _, ss := range b.ScopeSpans {
Expand Down
16 changes: 8 additions & 8 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ var (
_ GRPCCombiner[*tempopb.SearchTagValuesV2Response] = (*genericCombiner[*tempopb.SearchTagValuesV2Response])(nil)
)

func NewSearchTagValues(limitBytes int) Combiner {
func NewSearchTagValues(maxDataBytes int, maxTagsValues uint32, staleValueThreshold uint32) Combiner {
// Distinct collector with no limit
d := collector.NewDistinctStringWithDiff(limitBytes)
d := collector.NewDistinctStringWithDiff(maxDataBytes, maxTagsValues, staleValueThreshold)
inspectedBytes := atomic.NewUint64(0)

c := &genericCombiner[*tempopb.SearchTagValuesResponse]{
Expand Down Expand Up @@ -56,13 +56,13 @@ func NewSearchTagValues(limitBytes int) Combiner {
return c
}

func NewTypedSearchTagValues(limitBytes int) GRPCCombiner[*tempopb.SearchTagValuesResponse] {
return NewSearchTagValues(limitBytes).(GRPCCombiner[*tempopb.SearchTagValuesResponse])
func NewTypedSearchTagValues(maxDataBytes int, maxTagsValues uint32, staleValueThreshold uint32) GRPCCombiner[*tempopb.SearchTagValuesResponse] {
return NewSearchTagValues(maxDataBytes, maxTagsValues, staleValueThreshold).(GRPCCombiner[*tempopb.SearchTagValuesResponse])
}

func NewSearchTagValuesV2(limitBytes int) Combiner {
func NewSearchTagValuesV2(maxDataBytes int, maxTagsValues uint32, staleValueThreshold uint32) Combiner {
// Distinct collector with no limit and diff enabled
d := collector.NewDistinctValueWithDiff(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })
d := collector.NewDistinctValueWithDiff(maxDataBytes, maxTagsValues, staleValueThreshold, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })
inspectedBytes := atomic.NewUint64(0)

c := &genericCombiner[*tempopb.SearchTagValuesV2Response]{
Expand Down Expand Up @@ -113,6 +113,6 @@ func NewSearchTagValuesV2(limitBytes int) Combiner {
return c
}

func NewTypedSearchTagValuesV2(limitBytes int) GRPCCombiner[*tempopb.SearchTagValuesV2Response] {
return NewSearchTagValuesV2(limitBytes).(GRPCCombiner[*tempopb.SearchTagValuesV2Response])
func NewTypedSearchTagValuesV2(maxDataBytes int, maxTagsValues uint32, staleValueThreshold uint32) GRPCCombiner[*tempopb.SearchTagValuesV2Response] {
return NewSearchTagValuesV2(maxDataBytes, maxTagsValues, staleValueThreshold).(GRPCCombiner[*tempopb.SearchTagValuesV2Response])
}
16 changes: 8 additions & 8 deletions modules/frontend/combiner/search_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ var (
_ GRPCCombiner[*tempopb.SearchTagsV2Response] = (*genericCombiner[*tempopb.SearchTagsV2Response])(nil)
)

func NewSearchTags(limitBytes int) Combiner {
d := collector.NewDistinctStringWithDiff(limitBytes)
func NewSearchTags(maxDataBytes int, maxTagsPerScope uint32, staleValueThreshold uint32) Combiner {
d := collector.NewDistinctStringWithDiff(maxDataBytes, maxTagsPerScope, staleValueThreshold)
inspectedBytes := atomic.NewUint64(0)

c := &genericCombiner[*tempopb.SearchTagsResponse]{
Expand Down Expand Up @@ -56,13 +56,13 @@ func NewSearchTags(limitBytes int) Combiner {
return c
}

func NewTypedSearchTags(limitBytes int) GRPCCombiner[*tempopb.SearchTagsResponse] {
return NewSearchTags(limitBytes).(GRPCCombiner[*tempopb.SearchTagsResponse])
func NewTypedSearchTags(maxDataBytes int, maxTagsPerScope uint32, staleValueThreshold uint32) GRPCCombiner[*tempopb.SearchTagsResponse] {
return NewSearchTags(maxDataBytes, maxTagsPerScope, staleValueThreshold).(GRPCCombiner[*tempopb.SearchTagsResponse])
}

func NewSearchTagsV2(limitBytes int) Combiner {
func NewSearchTagsV2(maxDataBytes int, maxTagsPerScope uint32, staleValueThreshold uint32) Combiner {
// Distinct collector map to collect scopes and scope values
distinctValues := collector.NewScopedDistinctStringWithDiff(limitBytes)
distinctValues := collector.NewScopedDistinctStringWithDiff(maxDataBytes, maxTagsPerScope, staleValueThreshold)
inspectedBytes := atomic.NewUint64(0)

c := &genericCombiner[*tempopb.SearchTagsV2Response]{
Expand Down Expand Up @@ -121,6 +121,6 @@ func NewSearchTagsV2(limitBytes int) Combiner {
return c
}

func NewTypedSearchTagsV2(limitBytes int) GRPCCombiner[*tempopb.SearchTagsV2Response] {
return NewSearchTagsV2(limitBytes).(GRPCCombiner[*tempopb.SearchTagsV2Response])
func NewTypedSearchTagsV2(maxDataBytes int, maxTagsPerScope uint32, staleValueThreshold uint32) GRPCCombiner[*tempopb.SearchTagsV2Response] {
return NewSearchTagsV2(maxDataBytes, maxTagsPerScope, staleValueThreshold).(GRPCCombiner[*tempopb.SearchTagsV2Response])
}
55 changes: 36 additions & 19 deletions modules/frontend/combiner/search_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
func TestTagsCombiner(t *testing.T) {
tests := []struct {
name string
factory func(int) Combiner
limit int
factory func(int, uint32, uint32) Combiner
limitBytes int
maxTagsValues uint32
maxCacheHits uint32
result1 proto.Message
result2 proto.Message
expectedResult proto.Message
Expand All @@ -31,7 +33,7 @@ func TestTagsCombiner(t *testing.T) {
expectedResult: &tempopb.SearchTagsResponse{TagNames: []string{"tag1", "tag2", "tag3"}, Metrics: &tempopb.MetadataMetrics{}},
actualResult: &tempopb.SearchTagsResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagsResponse).TagNames) },
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagsV2",
Expand All @@ -49,7 +51,7 @@ func TestTagsCombiner(t *testing.T) {
return scopes[i].Name < scopes[j].Name
})
},
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagValues",
Expand All @@ -59,7 +61,7 @@ func TestTagsCombiner(t *testing.T) {
expectedResult: &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1", "tag2", "tag3"}, Metrics: &tempopb.MetadataMetrics{}},
actualResult: &tempopb.SearchTagValuesResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagValuesResponse).TagValues) },
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagValuesV2",
Expand All @@ -73,7 +75,7 @@ func TestTagsCombiner(t *testing.T) {
return m.(*tempopb.SearchTagValuesV2Response).TagValues[i].Value < m.(*tempopb.SearchTagValuesV2Response).TagValues[j].Value
})
},
limit: 100,
limitBytes: 100,
},
// limits
{
Expand All @@ -85,7 +87,7 @@ func TestTagsCombiner(t *testing.T) {
actualResult: &tempopb.SearchTagsResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagsResponse).TagNames) },
expectedShouldQuit: true,
limit: 5,
limitBytes: 5,
},
{
name: "SearchTagsV2 - limited",
Expand All @@ -104,7 +106,7 @@ func TestTagsCombiner(t *testing.T) {
})
},
expectedShouldQuit: true,
limit: 2,
limitBytes: 2,
},
{
name: "SearchTagValues - limited",
Expand All @@ -115,7 +117,7 @@ func TestTagsCombiner(t *testing.T) {
actualResult: &tempopb.SearchTagValuesResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagValuesResponse).TagValues) },
expectedShouldQuit: true,
limit: 5,
limitBytes: 5,
},
{
name: "SearchTagValuesV2 - limited",
Expand All @@ -130,7 +132,22 @@ func TestTagsCombiner(t *testing.T) {
})
},
expectedShouldQuit: true,
limit: 10,
limitBytes: 10,
},
{
name: "SearchTagValuesV2 - max values limited",
factory: NewSearchTagValuesV2,
result1: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}}},
result2: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v2", Type: "string"}, {Value: "v3", Type: "string"}}},
expectedResult: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}}, Metrics: &tempopb.MetadataMetrics{}},
actualResult: &tempopb.SearchTagValuesV2Response{},
sort: func(m proto.Message) {
sort.Slice(m.(*tempopb.SearchTagValuesV2Response).TagValues, func(i, j int) bool {
return m.(*tempopb.SearchTagValuesV2Response).TagValues[i].Value < m.(*tempopb.SearchTagValuesV2Response).TagValues[j].Value
})
},
expectedShouldQuit: true,
maxTagsValues: 1,
},
// with metrics
{
Expand All @@ -141,7 +158,7 @@ func TestTagsCombiner(t *testing.T) {
expectedResult: &tempopb.SearchTagsResponse{TagNames: []string{"tag1", "tag2", "tag3"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 2}},
actualResult: &tempopb.SearchTagsResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagsResponse).TagNames) },
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagsV2 - metrics",
Expand All @@ -159,7 +176,7 @@ func TestTagsCombiner(t *testing.T) {
return scopes[i].Name < scopes[j].Name
})
},
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagValues - metrics",
Expand All @@ -169,7 +186,7 @@ func TestTagsCombiner(t *testing.T) {
expectedResult: &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1", "tag2", "tag3"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 2}},
actualResult: &tempopb.SearchTagValuesResponse{},
sort: func(m proto.Message) { sort.Strings(m.(*tempopb.SearchTagValuesResponse).TagValues) },
limit: 100,
limitBytes: 100,
},
{
name: "SearchTagValuesV2 - metrics",
Expand All @@ -183,12 +200,12 @@ func TestTagsCombiner(t *testing.T) {
return m.(*tempopb.SearchTagValuesV2Response).TagValues[i].Value < m.(*tempopb.SearchTagValuesV2Response).TagValues[j].Value
})
},
limit: 100,
limitBytes: 100,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
combiner := tc.factory(tc.limit)
combiner := tc.factory(tc.limitBytes, tc.maxTagsValues, tc.maxCacheHits)

err := combiner.AddResponse(toHTTPResponse(t, tc.result1, 200))
assert.NoError(t, err)
Expand Down Expand Up @@ -228,7 +245,7 @@ func metrics(message proto.Message) *tempopb.MetadataMetrics {
}

func TestTagsGRPCCombiner(t *testing.T) {
c := NewTypedSearchTags(0)
c := NewTypedSearchTags(0, 0, 0)
res1 := &tempopb.SearchTagsResponse{TagNames: []string{"tag1"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
res2 := &tempopb.SearchTagsResponse{TagNames: []string{"tag1", "tag2"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
diff1 := &tempopb.SearchTagsResponse{TagNames: []string{"tag1"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
Expand All @@ -238,7 +255,7 @@ func TestTagsGRPCCombiner(t *testing.T) {
}

func TestTagsV2GRPCCombiner(t *testing.T) {
c := NewTypedSearchTagsV2(0)
c := NewTypedSearchTagsV2(0, 0, 0)
res1 := &tempopb.SearchTagsV2Response{Scopes: []*tempopb.SearchTagsV2Scope{{Name: "scope1", Tags: []string{"tag1"}}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
res2 := &tempopb.SearchTagsV2Response{Scopes: []*tempopb.SearchTagsV2Scope{{Name: "scope1", Tags: []string{"tag1", "tag2"}}, {Name: "scope2", Tags: []string{"tag3"}}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
diff1 := &tempopb.SearchTagsV2Response{Scopes: []*tempopb.SearchTagsV2Scope{{Name: "scope1", Tags: []string{"tag1"}}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
Expand All @@ -255,7 +272,7 @@ func TestTagsV2GRPCCombiner(t *testing.T) {
}

func TestTagValuesGRPCCombiner(t *testing.T) {
c := NewTypedSearchTagValues(0)
c := NewTypedSearchTagValues(0, 0, 0)
res1 := &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
res2 := &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1", "tag2"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
diff1 := &tempopb.SearchTagValuesResponse{TagValues: []string{"tag1"}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
Expand All @@ -265,7 +282,7 @@ func TestTagValuesGRPCCombiner(t *testing.T) {
}

func TestTagValuesV2GRPCCombiner(t *testing.T) {
c := NewTypedSearchTagValuesV2(0)
c := NewTypedSearchTagValuesV2(0, 0, 0)
res1 := &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
res2 := &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}, {Value: "v2", Type: "string"}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
diff1 := &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{{Value: "v1", Type: "string"}}, Metrics: &tempopb.MetadataMetrics{InspectedBytes: 1}}
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/search_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ type mockReader struct {
metas []*backend.BlockMeta
}

func (m *mockReader) SearchTags(context.Context, *backend.BlockMeta, string, common.SearchOptions) (*tempopb.SearchTagsV2Response, error) {
func (m *mockReader) SearchTags(context.Context, *backend.BlockMeta, *tempopb.SearchTagsBlockRequest, common.SearchOptions) (*tempopb.SearchTagsV2Response, error) {
return nil, nil
}

func (m *mockReader) SearchTagValues(context.Context, *backend.BlockMeta, string, common.SearchOptions) (*tempopb.SearchTagValuesResponse, error) {
func (m *mockReader) SearchTagValues(context.Context, *backend.BlockMeta, *tempopb.SearchTagValuesBlockRequest, common.SearchOptions) (*tempopb.SearchTagValuesResponse, error) {
return nil, nil
}

Expand Down
Loading

0 comments on commit 6c9dc98

Please sign in to comment.