diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 64e4a33ed2..d52c41b4e6 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -275,7 +275,21 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { seriesesByLabels[labelsToString(series.Labels)] = series continue } - seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...) + + series.SortPoints() + ls := len(series.Points) + // existing points are already sorted + cachedPoints := seriesesByLabels[labelsToString(series.Labels)].Points + lc := len(cachedPoints) + + // if cacheSeries Start or End lies in missed series it means it can be merged + if (cachedPoints[0].Timestamp >= series.Points[0].Timestamp && cachedPoints[0].Timestamp <= series.Points[ls-1].Timestamp) || + (cachedPoints[lc-1].Timestamp >= series.Points[0].Timestamp && cachedPoints[lc-1].Timestamp <= series.Points[ls-1].Timestamp) { + + seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...) + } else { + seriesesByLabels[labelsToString(series.Labels)] = series + } } // Sort the points in each series by timestamp for idx := range seriesesByLabels { diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 962ca3832a..4a77074a5f 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -12,6 +12,190 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) +func TestMergeSerieses(t *testing.T) { + + testCases := []struct { + name string + cachedSeries []*v3.Series + missedSeries []*v3.Series + resultSeries []*v3.Series + }{ + { + name: "merge two series", + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596723, Value: 2}, + {Timestamp: 1675115596724, Value: 3}, + }, + }, + }, + missedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596724, Value: 3}, + {Timestamp: 1675115596725, Value: 4}, + }, + }, + }, + resultSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596723, Value: 2}, + {Timestamp: 1675115596724, Value: 3}, + {Timestamp: 1675115596725, Value: 4}, + }, + }, + }, + }, + { + name: "dont merge if start of missed is after end of cached", + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596723, Value: 2}, + {Timestamp: 1675115596724, Value: 3}, + }, + }, + }, + missedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596726, Value: 5}, + {Timestamp: 1675115596727, Value: 6}, + }, + }, + }, + resultSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596726, Value: 5}, + {Timestamp: 1675115596727, Value: 6}, + }, + }, + }, + }, + { + name: "dont merge if end of missed is before start of cached", + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 8}, + {Timestamp: 1675115596723, Value: 9}, + {Timestamp: 1675115596724, Value: 10}, + }, + }, + }, + missedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596720, Value: 5}, + {Timestamp: 1675115596721, Value: 6}, + }, + }, + }, + resultSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596720, Value: 5}, + {Timestamp: 1675115596721, Value: 6}, + }, + }, + }, + }, + { + name: "cache is a subset of missed series", + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596723, Value: 3}, + {Timestamp: 1675115596724, Value: 4}, + {Timestamp: 1675115596725, Value: 5}, + }, + }, + }, + missedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596721, Value: 1}, + {Timestamp: 1675115596722, Value: 2}, + {Timestamp: 1675115596723, Value: 3}, + {Timestamp: 1675115596725, Value: 5}, + {Timestamp: 1675115596726, Value: 6}, + }, + }, + }, + resultSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596721, Value: 1}, + {Timestamp: 1675115596722, Value: 2}, + {Timestamp: 1675115596723, Value: 3}, + {Timestamp: 1675115596724, Value: 4}, + {Timestamp: 1675115596725, Value: 5}, + {Timestamp: 1675115596726, Value: 6}, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + res := mergeSerieses(tc.cachedSeries, tc.missedSeries) + for sIdx, series := range tc.resultSeries { + if len(res[sIdx].Points) != len(series.Points) { + t.Errorf("expected %d, got %d", len(series.Points), len(res[sIdx].Points)) + } + for pIdx, point := range series.Points { + if res[sIdx].Points[pIdx].Timestamp != point.Timestamp { + t.Errorf("expected %d, got %d", point.Timestamp, res[sIdx].Points[pIdx].Timestamp) + } + } + } + }) + } +} + func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { // There are five scenarios: // 1. Cached time range is a subset of the requested time range diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index 5e0c18afb5..ea72783953 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -286,7 +286,21 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { seriesesByLabels[labelsToString(series.Labels)] = series continue } - seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...) + + series.SortPoints() + ls := len(series.Points) + // existing points are already sorted + cachedPoints := seriesesByLabels[labelsToString(series.Labels)].Points + lc := len(cachedPoints) + + // if cacheSeries Start or End lies in missed series it means it can be merged + if (cachedPoints[0].Timestamp >= series.Points[0].Timestamp && cachedPoints[0].Timestamp <= series.Points[ls-1].Timestamp) || + (cachedPoints[lc-1].Timestamp >= series.Points[0].Timestamp && cachedPoints[lc-1].Timestamp <= series.Points[ls-1].Timestamp) { + + seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...) + } else { + seriesesByLabels[labelsToString(series.Labels)] = series + } } // Sort the points in each series by timestamp diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index b8309c68ff..f798d12539 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -12,6 +12,190 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) +func TestMergeSerieses(t *testing.T) { + + testCases := []struct { + name string + cachedSeries []*v3.Series + missedSeries []*v3.Series + resultSeries []*v3.Series + }{ + { + name: "merge two series", + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596723, Value: 2}, + {Timestamp: 1675115596724, Value: 3}, + }, + }, + }, + missedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596724, Value: 3}, + {Timestamp: 1675115596725, Value: 4}, + }, + }, + }, + resultSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596723, Value: 2}, + {Timestamp: 1675115596724, Value: 3}, + {Timestamp: 1675115596725, Value: 4}, + }, + }, + }, + }, + { + name: "dont merge if start of missed is after end of cached", + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596723, Value: 2}, + {Timestamp: 1675115596724, Value: 3}, + }, + }, + }, + missedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596726, Value: 5}, + {Timestamp: 1675115596727, Value: 6}, + }, + }, + }, + resultSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596726, Value: 5}, + {Timestamp: 1675115596727, Value: 6}, + }, + }, + }, + }, + { + name: "dont merge if end of missed is before start of cached", + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 8}, + {Timestamp: 1675115596723, Value: 9}, + {Timestamp: 1675115596724, Value: 10}, + }, + }, + }, + missedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596720, Value: 5}, + {Timestamp: 1675115596721, Value: 6}, + }, + }, + }, + resultSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596720, Value: 5}, + {Timestamp: 1675115596721, Value: 6}, + }, + }, + }, + }, + { + name: "cache is a subset of missed series", + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596723, Value: 3}, + {Timestamp: 1675115596724, Value: 4}, + {Timestamp: 1675115596725, Value: 5}, + }, + }, + }, + missedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596721, Value: 1}, + {Timestamp: 1675115596722, Value: 2}, + {Timestamp: 1675115596723, Value: 3}, + {Timestamp: 1675115596725, Value: 5}, + {Timestamp: 1675115596726, Value: 6}, + }, + }, + }, + resultSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596721, Value: 1}, + {Timestamp: 1675115596722, Value: 2}, + {Timestamp: 1675115596723, Value: 3}, + {Timestamp: 1675115596724, Value: 4}, + {Timestamp: 1675115596725, Value: 5}, + {Timestamp: 1675115596726, Value: 6}, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + res := mergeSerieses(tc.cachedSeries, tc.missedSeries) + for sIdx, series := range tc.resultSeries { + if len(res[sIdx].Points) != len(series.Points) { + t.Errorf("expected %d, got %d", len(series.Points), len(res[sIdx].Points)) + } + for pIdx, point := range series.Points { + if res[sIdx].Points[pIdx].Timestamp != point.Timestamp { + t.Errorf("expected %d, got %d", point.Timestamp, res[sIdx].Points[pIdx].Timestamp) + } + } + } + }) + } +} + func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) { // There are five scenarios: // 1. Cached time range is a subset of the requested time range