Skip to content

Commit

Permalink
chore: move traces builder query attributes enrichment before query p…
Browse files Browse the repository at this point in the history
…rep (#5917)
  • Loading branch information
srikanthccv authored Sep 13, 2024
1 parent 3fdfb51 commit a5f3a18
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 119 deletions.
20 changes: 11 additions & 9 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2521,7 +2521,7 @@ func (aH *APIHandler) getNetworkData(
var result []*v3.Result
var errQueriesByName map[string]error

result, errQueriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
result, errQueriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQueriesByName)
Expand Down Expand Up @@ -2556,7 +2556,7 @@ func (aH *APIHandler) getNetworkData(
return
}

resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQueriesByNameFetchLatency)
Expand Down Expand Up @@ -2617,7 +2617,7 @@ func (aH *APIHandler) getProducerData(
var result []*v3.Result
var errQuriesByName map[string]error

result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
Expand Down Expand Up @@ -2658,7 +2658,7 @@ func (aH *APIHandler) getConsumerData(
var result []*v3.Result
var errQuriesByName map[string]error

result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
Expand Down Expand Up @@ -3020,7 +3020,7 @@ func (aH *APIHandler) calculateLogsConnectionStatus(
},
}
queryRes, _, err := aH.querier.QueryRange(
ctx, qrParams, map[string]v3.AttributeKey{},
ctx, qrParams,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
Expand Down Expand Up @@ -3670,13 +3670,14 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName)
return
}
tracesV3.Enrich(queryRangeParams, spanKeys)
}

// WARN: Only works for AND operator in traces query
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params
isUsed, traceIDs := tracesV3.TraceIdFilterUsedWithEqual(queryRangeParams)
if isUsed == true && len(traceIDs) > 0 {
if isUsed && len(traceIDs) > 0 {
zap.L().Debug("traceID used as filter in traces query")
// query signoz_spans table with traceID to get min and max timestamp
min, max, err := aH.reader.GetMinAndMaxTimestampForTraceID(ctx, traceIDs)
Expand Down Expand Up @@ -3706,7 +3707,7 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
}
}

result, errQuriesByName, err = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys)
result, errQuriesByName, err = aH.querier.QueryRange(ctx, queryRangeParams)

if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
Expand Down Expand Up @@ -4044,13 +4045,14 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName)
return
}
tracesV3.Enrich(queryRangeParams, spanKeys)
}

// WARN: Only works for AND operator in traces query
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params
isUsed, traceIDs := tracesV3.TraceIdFilterUsedWithEqual(queryRangeParams)
if isUsed == true && len(traceIDs) > 0 {
if isUsed && len(traceIDs) > 0 {
zap.L().Debug("traceID used as filter in traces query")
// query signoz_spans table with traceID to get min and max timestamp
min, max, err := aH.reader.GetMinAndMaxTimestampForTraceID(ctx, traceIDs)
Expand All @@ -4062,7 +4064,7 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
}
}

result, errQuriesByName, err = aH.querierV2.QueryRange(ctx, queryRangeParams, spanKeys)
result, errQuriesByName, err = aH.querierV2.QueryRange(ctx, queryRangeParams)

if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
Expand Down
9 changes: 2 additions & 7 deletions pkg/query-service/app/querier/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (q *querier) runBuilderQuery(
ctx context.Context,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
keys map[string]v3.AttributeKey,
cacheKeys map[string]string,
ch chan channelResult,
wg *sync.WaitGroup,
Expand Down Expand Up @@ -196,7 +195,6 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
Expand All @@ -208,7 +206,6 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
Expand All @@ -222,7 +219,6 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{PreferRPM: preferRPM},
)
if err != nil {
Expand Down Expand Up @@ -333,7 +329,6 @@ func (q *querier) runBuilderExpression(
ctx context.Context,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
keys map[string]v3.AttributeKey,
cacheKeys map[string]string,
ch chan channelResult,
wg *sync.WaitGroup,
Expand All @@ -342,7 +337,7 @@ func (q *querier) runBuilderExpression(

queryName := builderQuery.QueryName

queries, err := q.builder.PrepareQueries(params, keys)
queries, err := q.builder.PrepareQueries(params)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: "", Series: nil}
return
Expand Down Expand Up @@ -377,7 +372,7 @@ func (q *querier) runBuilderExpression(
NoCache: params.NoCache,
CompositeQuery: params.CompositeQuery,
Variables: params.Variables,
}, keys)
})
query := missQueries[queryName]
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions pkg/query-service/app/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
return mergedSeries
}

func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {

cacheKeys := q.keyGenerator.GenerateKeys(params)

Expand All @@ -310,9 +310,9 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
}
wg.Add(1)
if queryName == builderQuery.Expression {
go q.runBuilderQuery(ctx, builderQuery, params, keys, cacheKeys, ch, &wg)
go q.runBuilderQuery(ctx, builderQuery, params, cacheKeys, ch, &wg)
} else {
go q.runBuilderExpression(ctx, builderQuery, params, keys, cacheKeys, ch, &wg)
go q.runBuilderExpression(ctx, builderQuery, params, cacheKeys, ch, &wg)
}
}

Expand Down Expand Up @@ -470,7 +470,7 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err
}

func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
res := make([]*v3.Result, 0)
qName := ""
pageSize := uint64(0)
Expand All @@ -487,7 +487,7 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
params.End = v.End

params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params, keys)
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -529,21 +529,21 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
return res, nil, nil
}

func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
// List query has support for only one query.
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 {
for _, v := range params.CompositeQuery.BuilderQueries {
// only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End)
if len(startEndArr) > 0 {
return q.runLogsListQuery(ctx, params, keys, startEndArr)
return q.runLogsListQuery(ctx, params, startEndArr)
}
}
}
}

queries, err := q.builder.PrepareQueries(params, keys)
queries, err := q.builder.PrepareQueries(params)

if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -590,17 +590,17 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
return res, nil, nil
}

func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
var results []*v3.Result
var err error
var errQueriesByName map[string]error
if params.CompositeQuery != nil {
switch params.CompositeQuery.QueryType {
case v3.QueryTypeBuilder:
if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace {
results, errQueriesByName, err = q.runBuilderListQueries(ctx, params, keys)
results, errQueriesByName, err = q.runBuilderListQueries(ctx, params)
} else {
results, errQueriesByName, err = q.runBuilderQueries(ctx, params, keys)
results, errQueriesByName, err = q.runBuilderQueries(ctx, params)
}
// in builder query, the only errors we expose are the ones that exceed the resource limits
// everything else is internal error as they are not actionable by the user
Expand Down
19 changes: 13 additions & 6 deletions pkg/query-service/app/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
Expand Down Expand Up @@ -584,7 +585,8 @@ func TestQueryRange(t *testing.T) {
}

for i, param := range params {
_, errByName, err := q.QueryRange(context.Background(), param, nil)
tracesV3.Enrich(param, map[string]v3.AttributeKey{})
_, errByName, err := q.QueryRange(context.Background(), param)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
Expand Down Expand Up @@ -693,7 +695,8 @@ func TestQueryRangeValueType(t *testing.T) {
}

for i, param := range params {
_, errByName, err := q.QueryRange(context.Background(), param, nil)
tracesV3.Enrich(param, map[string]v3.AttributeKey{})
_, errByName, err := q.QueryRange(context.Background(), param)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
Expand Down Expand Up @@ -746,7 +749,8 @@ func TestQueryRangeTimeShift(t *testing.T) {
expectedTimeRangeInQueryString := fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722-86400*1000)*1000000, ((1675115596722+120*60*1000)-86400*1000)*1000000)

for i, param := range params {
_, errByName, err := q.QueryRange(context.Background(), param, nil)
tracesV3.Enrich(param, map[string]v3.AttributeKey{})
_, errByName, err := q.QueryRange(context.Background(), param)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
Expand Down Expand Up @@ -844,7 +848,8 @@ func TestQueryRangeTimeShiftWithCache(t *testing.T) {
}

for i, param := range params {
_, errByName, err := q.QueryRange(context.Background(), param, nil)
tracesV3.Enrich(param, map[string]v3.AttributeKey{})
_, errByName, err := q.QueryRange(context.Background(), param)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
Expand Down Expand Up @@ -944,7 +949,8 @@ func TestQueryRangeTimeShiftWithLimitAndCache(t *testing.T) {
}

for i, param := range params {
_, errByName, err := q.QueryRange(context.Background(), param, nil)
tracesV3.Enrich(param, map[string]v3.AttributeKey{})
_, errByName, err := q.QueryRange(context.Background(), param)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
Expand Down Expand Up @@ -1033,7 +1039,8 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
}

for i, param := range params {
_, errByName, err := q.QueryRange(context.Background(), param, nil)
tracesV3.Enrich(param, map[string]v3.AttributeKey{})
_, errByName, err := q.QueryRange(context.Background(), param)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/query-service/app/querier/v2/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (q *querier) runBuilderQuery(
ctx context.Context,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
keys map[string]v3.AttributeKey,
cacheKeys map[string]string,
ch chan channelResult,
wg *sync.WaitGroup,
Expand Down Expand Up @@ -195,7 +194,6 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
Expand All @@ -207,7 +205,6 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
Expand All @@ -221,7 +218,6 @@ func (q *querier) runBuilderQuery(
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{PreferRPM: preferRPM},
)
if err != nil {
Expand Down
Loading

0 comments on commit a5f3a18

Please sign in to comment.