Skip to content

Commit

Permalink
Merge branch 'develop' into enable-new-search-bar-for-logs
Browse files Browse the repository at this point in the history
  • Loading branch information
vikrantgupta25 authored Sep 20, 2024
2 parents da3719c + 033b64a commit 4cf2a7e
Show file tree
Hide file tree
Showing 10 changed files with 964 additions and 744 deletions.
188 changes: 47 additions & 141 deletions pkg/query-service/app/querier/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package querier

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/status"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"go.signoz.io/signoz/pkg/query-service/querycache"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -107,7 +106,8 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceLogs {
var query string
var err error
if _, ok := cacheKeys[queryName]; !ok {
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
Expand All @@ -118,21 +118,11 @@ func (q *querier) runBuilderQuery(
return
}

cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.start, miss.end, builderQuery, params, preferRPM)
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
Expand All @@ -147,48 +137,23 @@ func (q *querier) runBuilderQuery(
}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}

var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.L().Error("error marshalling merged series", zap.Error(marshallingErr))
}
missedSeries = append(missedSeries, querycache.CachedSeriesData{
Start: miss.Start,
End: miss.End,
Data: series,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)

// response doesn't need everything
filterCachedPoints(mergedSeries, start, end)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)

ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}

// Cache the seriesList for future queries
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
// caching the data
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
Series: resultSeries,
}

return

}

if builderQuery.DataSource == v3.DataSourceTraces {
Expand Down Expand Up @@ -242,7 +207,8 @@ func (q *querier) runBuilderQuery(
// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok {
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query, err := metricsV3.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
Expand All @@ -254,22 +220,13 @@ func (q *querier) runBuilderQuery(
}

cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKey)
zap.L().Info("cache misses for metrics query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery(
miss.start,
miss.end,
miss.Start,
miss.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
Expand All @@ -294,41 +251,20 @@ func (q *querier) runBuilderQuery(
}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}
var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.L().Error("error marshalling merged series", zap.Error(marshallingErr))
}
missedSeries = append(missedSeries, querycache.CachedSeriesData{
Start: miss.Start,
End: miss.End,
Data: series,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)

resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)

// response doesn't need everything
filterCachedPoints(mergedSeries, start, end)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}

// Cache the seriesList for future queries
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
Series: resultSeries,
}
}

Expand All @@ -350,31 +286,23 @@ func (q *querier) runBuilderExpression(
return
}

if _, ok := cacheKeys[queryName]; !ok {
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for expression query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query := queries[queryName]
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}

cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
step := postprocess.StepIntervalForFunction(params, queryName)
misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, step, cacheKey)
zap.L().Info("cache misses for expression query", zap.Any("misses", misses))
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{
Start: miss.start,
End: miss.end,
Start: miss.Start,
End: miss.End,
Step: params.Step,
NoCache: params.NoCache,
CompositeQuery: params.CompositeQuery,
Expand All @@ -386,41 +314,19 @@ func (q *querier) runBuilderExpression(
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
missedSeries = append(missedSeries, querycache.CachedSeriesData{
Start: miss.Start,
End: miss.End,
Data: series,
})
}
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)

var mergedSeriesData []byte
missedSeriesLen := len(missedSeries)
var marshallingErr error
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.L().Error("error marshalling merged series", zap.Error(marshallingErr))
}
}
resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End)

// response doesn't need everything
filterCachedPoints(mergedSeries, params.Start, params.End)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}

// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
Series: resultSeries,
}
}
Loading

0 comments on commit 4cf2a7e

Please sign in to comment.