From b80ad572becc42a33be7576c7179cfa85189bc96 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 20 Sep 2024 03:10:10 +0530 Subject: [PATCH] chore: update tests --- pkg/query-service/app/querier/helper.go | 7 + pkg/query-service/app/querier/querier.go | 2 + pkg/query-service/app/querier/querier_test.go | 170 +++++++++++------ pkg/query-service/app/querier/v2/helper.go | 5 + pkg/query-service/app/querier/v2/querier.go | 2 + .../app/querier/v2/querier_test.go | 174 ++++++++++++------ .../querycache/query_range_cache.go | 23 ++- 7 files changed, 267 insertions(+), 116 deletions(-) diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index f6608ab124..00b287ce8e 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -15,6 +15,7 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/postprocess" "go.signoz.io/signoz/pkg/query-service/querycache" + "go.uber.org/zap" ) func prepareLogsQuery(_ context.Context, @@ -106,6 +107,7 @@ func (q *querier) runBuilderQuery( var query string var err error if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} @@ -117,6 +119,7 @@ func (q *querier) runBuilderQuery( } misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName]) + zap.L().Info("cache misses for logs query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM) @@ -205,6 +208,7 @@ func (q *querier) runBuilderQuery( // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // If the query is not cached, we execute the query and return the result without caching it. if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query, err := metricsV3.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} @@ -217,6 +221,7 @@ func (q *querier) runBuilderQuery( cacheKey := cacheKeys[queryName] misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKey) + zap.L().Info("cache misses for metrics query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query, err := metricsV3.PrepareMetricQuery( @@ -282,6 +287,7 @@ func (q *querier) runBuilderExpression( } if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for expression query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query := queries[queryName] series, err := q.execClickHouseQuery(ctx, query) ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} @@ -291,6 +297,7 @@ func (q *querier) runBuilderExpression( cacheKey := cacheKeys[queryName] step := postprocess.StepIntervalForFunction(params, queryName) misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, step, cacheKey) + zap.L().Info("cache misses for expression query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{ diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 7b79ec9dae..fd7198b334 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -213,12 +213,14 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam cacheKey, ok := cacheKeys[queryName] if !ok || params.NoCache { + zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query := metricsV3.BuildPromQuery(promQuery, params.Step, params.Start, params.End) series, err := q.execPromQuery(ctx, query) channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series} return } misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey) + zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End) diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index a4814d0c0a..c30841546e 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -2,7 +2,9 @@ package querier import ( "context" + "encoding/json" "fmt" + "math" "strings" "testing" "time" @@ -11,8 +13,33 @@ import ( tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache/inmemory" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" ) +func minTimestamp(series []*v3.Series) int64 { + min := int64(math.MaxInt64) + for _, series := range series { + for _, point := range series.Points { + if point.Timestamp < min { + min = point.Timestamp + } + } + } + return min +} + +func maxTimestamp(series []*v3.Series) int64 { + max := int64(math.MinInt64) + for _, series := range series { + for _, point := range series.Points { + if point.Timestamp > max { + max = point.Timestamp + } + } + } + return max +} + func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { // There are five scenarios: // 1. Cached time range is a subset of the requested time range @@ -26,7 +53,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { requestedEnd int64 // in milliseconds requestedStep int64 // in seconds cachedSeries []*v3.Series - expectedMiss []missInterval + expectedMiss []querycache.MissInterval replaceCachedData bool }{ { @@ -51,14 +78,14 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -92,7 +119,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{}, + expectedMiss: []querycache.MissInterval{}, }, { name: "cached time range is a left overlap of the requested time range", @@ -120,10 +147,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -153,10 +180,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, }, }, @@ -186,31 +213,48 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722, + End: 1675115596722 + 180*60*1000, }, }, replaceCachedData: true, }, } - for _, tc := range testCases { + c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + qc := querycache.NewQueryCache(querycache.WithCache(c)) + + for idx, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) + cacheKey := fmt.Sprintf("test-cache-key-%d", idx) + cachedData := &querycache.CachedSeriesData{ + Start: minTimestamp(tc.cachedSeries), + End: maxTimestamp(tc.cachedSeries), + Data: tc.cachedSeries, + } + jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData}) + if err != nil { + t.Errorf("error marshalling cached data: %v", err) + } + err = c.Store(cacheKey, jsonData, 5*time.Minute) + if err != nil { + t.Errorf("error storing cached data: %v", err) + } + + misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } - if replaceCachedData != tc.replaceCachedData { - t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData) - } + for i, miss := range misses { - if miss.start != tc.expectedMiss[i].start { - t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + if miss.Start != tc.expectedMiss[i].Start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start) } - if miss.end != tc.expectedMiss[i].end { - t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + if miss.End != tc.expectedMiss[i].End { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End) } } }) @@ -226,7 +270,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { requestedStep int64 cachedSeries []*v3.Series fluxInterval time.Duration - expectedMiss []missInterval + expectedMiss []querycache.MissInterval }{ { name: "cached time range is a subset of the requested time range", @@ -251,14 +295,14 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -293,7 +337,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{}, + expectedMiss: []querycache.MissInterval{}, }, { name: "cache time range is a left overlap of the requested time range", @@ -322,10 +366,10 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -356,10 +400,10 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, }, }, @@ -390,27 +434,45 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722, + End: 1675115596722 + 180*60*1000, }, }, }, } - for _, tc := range testCases { + c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + qc := querycache.NewQueryCache(querycache.WithCache(c)) + + for idx, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) + cacheKey := fmt.Sprintf("test-cache-key-%d", idx) + cachedData := &querycache.CachedSeriesData{ + Start: minTimestamp(tc.cachedSeries), + End: maxTimestamp(tc.cachedSeries), + Data: tc.cachedSeries, + } + jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData}) + if err != nil { + t.Errorf("error marshalling cached data: %v", err) + } + err = c.Store(cacheKey, jsonData, 5*time.Minute) + if err != nil { + t.Errorf("error storing cached data: %v", err) + } + misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } for i, miss := range misses { - if miss.start != tc.expectedMiss[i].start { - t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + if miss.Start != tc.expectedMiss[i].Start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start) } - if miss.end != tc.expectedMiss[i].end { - t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + if miss.End != tc.expectedMiss[i].End { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End) } } }) @@ -1022,18 +1084,18 @@ func TestQueryRangeValueTypePromQL(t *testing.T) { expectedQueryAndTimeRanges := []struct { query string - ranges []missInterval + ranges []querycache.MissInterval }{ { query: "signoz_calls_total", - ranges: []missInterval{ - {start: 1675115596722, end: 1675115596722 + 120*60*1000}, + ranges: []querycache.MissInterval{ + {Start: 1675115596722, End: 1675115596722 + 120*60*1000}, }, }, { query: "signoz_latency_bucket", - ranges: []missInterval{ - {start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000}, + ranges: []querycache.MissInterval{ + {Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000}, }, }, } @@ -1054,10 +1116,10 @@ func TestQueryRangeValueTypePromQL(t *testing.T) { if len(q.TimeRanges()[i]) != 2 { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } - if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) { + if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].Start) { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } - if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) { + if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].End) { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } } diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 1d9463b4b1..bb41bc8c36 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -15,6 +15,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/querycache" + "go.uber.org/zap" ) func prepareLogsQuery(_ context.Context, @@ -107,6 +108,7 @@ func (q *querier) runBuilderQuery( var query string var err error if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} @@ -117,6 +119,7 @@ func (q *querier) runBuilderQuery( return } misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName]) + zap.L().Info("cache misses for logs query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM) @@ -205,6 +208,7 @@ func (q *querier) runBuilderQuery( // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // If the query is not cached, we execute the query and return the result without caching it. if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query, err := metricsV4.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} @@ -216,6 +220,7 @@ func (q *querier) runBuilderQuery( } misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName]) + zap.L().Info("cache misses for metrics query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query, err := metricsV4.PrepareMetricQuery( diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index c363aa9fac..f8316d6f6c 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -211,12 +211,14 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam cacheKey, ok := cacheKeys[queryName] if !ok || params.NoCache { + zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query := metricsV4.BuildPromQuery(promQuery, params.Step, params.Start, params.End) series, err := q.execPromQuery(ctx, query) channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series} return } misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey) + zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query := metricsV4.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End) diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index c65b6ff54a..6dfc921183 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -2,7 +2,9 @@ package v2 import ( "context" + "encoding/json" "fmt" + "math" "strings" "testing" "time" @@ -11,9 +13,34 @@ import ( tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache/inmemory" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" ) -func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { +func minTimestamp(series []*v3.Series) int64 { + min := int64(math.MaxInt64) + for _, series := range series { + for _, point := range series.Points { + if point.Timestamp < min { + min = point.Timestamp + } + } + } + return min +} + +func maxTimestamp(series []*v3.Series) int64 { + max := int64(math.MinInt64) + for _, series := range series { + for _, point := range series.Points { + if point.Timestamp > max { + max = point.Timestamp + } + } + } + return max +} + +func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) { // There are five scenarios: // 1. Cached time range is a subset of the requested time range // 2. Cached time range is a superset of the requested time range @@ -26,7 +53,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { requestedEnd int64 // in milliseconds requestedStep int64 // in seconds cachedSeries []*v3.Series - expectedMiss []missInterval + expectedMiss []querycache.MissInterval replaceCachedData bool }{ { @@ -51,14 +78,14 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -92,7 +119,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{}, + expectedMiss: []querycache.MissInterval{}, }, { name: "cached time range is a left overlap of the requested time range", @@ -120,10 +147,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -153,10 +180,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, }, }, @@ -186,31 +213,48 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722, + End: 1675115596722 + 180*60*1000, }, }, replaceCachedData: true, }, } - for _, tc := range testCases { + c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + qc := querycache.NewQueryCache(querycache.WithCache(c)) + + for idx, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) + cacheKey := fmt.Sprintf("test-cache-key-%d", idx) + cachedData := &querycache.CachedSeriesData{ + Start: minTimestamp(tc.cachedSeries), + End: maxTimestamp(tc.cachedSeries), + Data: tc.cachedSeries, + } + jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData}) + if err != nil { + t.Errorf("error marshalling cached data: %v", err) + } + err = c.Store(cacheKey, jsonData, 5*time.Minute) + if err != nil { + t.Errorf("error storing cached data: %v", err) + } + + misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } - if replaceCachedData != tc.replaceCachedData { - t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData) - } + for i, miss := range misses { - if miss.start != tc.expectedMiss[i].start { - t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + if miss.Start != tc.expectedMiss[i].Start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start) } - if miss.end != tc.expectedMiss[i].end { - t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + if miss.End != tc.expectedMiss[i].End { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End) } } }) @@ -226,7 +270,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { requestedStep int64 cachedSeries []*v3.Series fluxInterval time.Duration - expectedMiss []missInterval + expectedMiss []querycache.MissInterval }{ { name: "cached time range is a subset of the requested time range", @@ -251,14 +295,14 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -293,7 +337,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{}, + expectedMiss: []querycache.MissInterval{}, }, { name: "cache time range is a left overlap of the requested time range", @@ -322,10 +366,10 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -356,10 +400,10 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, }, }, @@ -390,27 +434,47 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722, + End: 1675115596722 + 180*60*1000, }, }, }, } - for _, tc := range testCases { + c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + qc := querycache.NewQueryCache(querycache.WithCache(c)) + + for idx, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) + cacheKey := fmt.Sprintf("test-cache-key-%d", idx) + cachedData := &querycache.CachedSeriesData{ + Start: minTimestamp(tc.cachedSeries), + End: maxTimestamp(tc.cachedSeries), + Data: tc.cachedSeries, + } + jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData}) + if err != nil { + t.Errorf("error marshalling cached data: %v", err) + return + } + err = c.Store(cacheKey, jsonData, 5*time.Minute) + if err != nil { + t.Errorf("error storing cached data: %v", err) + return + } + misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } for i, miss := range misses { - if miss.start != tc.expectedMiss[i].start { - t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + if miss.Start != tc.expectedMiss[i].Start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start) } - if miss.end != tc.expectedMiss[i].end { - t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + if miss.End != tc.expectedMiss[i].End { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End) } } }) @@ -1074,18 +1138,18 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) { expectedQueryAndTimeRanges := []struct { query string - ranges []missInterval + ranges []querycache.MissInterval }{ { query: "signoz_calls_total", - ranges: []missInterval{ - {start: 1675115596722, end: 1675115596722 + 120*60*1000}, + ranges: []querycache.MissInterval{ + {Start: 1675115596722, End: 1675115596722 + 120*60*1000}, }, }, { query: "signoz_latency_bucket", - ranges: []missInterval{ - {start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000}, + ranges: []querycache.MissInterval{ + {Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000}, }, }, } @@ -1106,10 +1170,10 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) { if len(q.TimeRanges()[i]) != 2 { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } - if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) { + if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].Start) { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } - if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) { + if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].End) { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } } diff --git a/pkg/query-service/querycache/query_range_cache.go b/pkg/query-service/querycache/query_range_cache.go index 5696abf69d..3b3e3be93c 100644 --- a/pkg/query-service/querycache/query_range_cache.go +++ b/pkg/query-service/querycache/query_range_cache.go @@ -54,19 +54,15 @@ func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey stri return []MissInterval{{Start: start, End: end}} } - cachedData, retrieveStatus, _ := q.cache.Retrieve(cacheKey, true) - zap.L().Info("cache retrieve status for key", zap.String("cacheKey", cacheKey), zap.String("status", retrieveStatus.String())) - var cachedSeriesDataList []*CachedSeriesData - if err := json.Unmarshal(cachedData, &cachedSeriesDataList); err != nil { - // In case of error, we return the entire range as a miss - return []MissInterval{{Start: start, End: end}} - } + cachedSeriesDataList := q.getCachedSeriesData(cacheKey) // Sort the cached data by start time sort.Slice(cachedSeriesDataList, func(i, j int) bool { return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start }) + zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList))) + // Exclude the flux interval from the cached end time // Why do we use `time.Now()` here? @@ -119,6 +115,15 @@ func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey stri return missingRanges } +func (q *queryCache) getCachedSeriesData(cacheKey string) []*CachedSeriesData { + cachedData, _, _ := q.cache.Retrieve(cacheKey, true) + var cachedSeriesDataList []*CachedSeriesData + if err := json.Unmarshal(cachedData, &cachedSeriesDataList); err != nil { + return nil + } + return cachedSeriesDataList +} + func (q *queryCache) mergeSeries(cachedSeries, missedSeries []*v3.Series) []*v3.Series { // Merge the missed series with the cached series by timestamp mergedSeries := make([]*v3.Series, 0) @@ -161,6 +166,10 @@ func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesD func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData { + if q.cache == nil { + return newData + } + cachedData, _, _ := q.cache.Retrieve(cacheKey, true) var existingData []CachedSeriesData if err := json.Unmarshal(cachedData, &existingData); err != nil {