diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 349cce4aef..fe61fce27f 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -39,6 +39,7 @@ import ( queryprogress "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader/query_progress" "go.signoz.io/signoz/pkg/query-service/app/logs" + "go.signoz.io/signoz/pkg/query-service/app/resource" "go.signoz.io/signoz/pkg/query-service/app/services" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/common" @@ -552,7 +553,66 @@ func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig return &operations, nil } -func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) { +func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc string, start, end time.Time) (string, error) { + // assuming all will be resource attributes. + // and resource attributes are string for traces + filterSet := v3.FilterSet{} + for _, tag := range tags { + // skip the collector id as we don't add it to traces + if tag.Key == "signoz.collector.id" { + continue + } + key := v3.AttributeKey{ + Key: tag.Key, + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + } + + it := v3.FilterItem{ + Key: key, + } + + // as of now only in and not in are supported + switch tag.Operator { + case model.NotInOperator: + it.Operator = v3.FilterOperatorNotIn + it.Value = tag.StringValues + case model.InOperator: + it.Operator = v3.FilterOperatorIn + it.Value = tag.StringValues + default: + return "", fmt.Errorf("operator %s not supported", tag.Operator) + } + + filterSet.Items = append(filterSet.Items, it) + } + filterSet.Items = append(filterSet.Items, v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorEqual, + Value: svc, + }) + + resourceSubQuery, err := resource.BuildResourceSubQuery( + r.TraceDB, + r.traceResourceTableV3, + start.Unix()-1800, + end.Unix(), + &filterSet, + []v3.AttributeKey{}, + v3.AttributeKey{}, + false) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return "", err + } + return resourceSubQuery, nil +} + +func (r *ClickHouseReader) GetServicesV2(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) { if r.indexTable == "" { return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} @@ -618,17 +678,133 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G clickhouse.Named("names", ops), ) - if r.useTraceNewSchema { - resourceBucketFilter := fmt.Sprintf(constants.TraceResourceBucketFilterWithServiceName, r.TraceDB, r.traceResourceTableV3) - query += resourceBucketFilter - errorQuery += resourceBucketFilter - args = append(args, - clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), - clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), - clickhouse.Named("labelFilter", "%service.name%"+strings.ToLower(utils.QuoteEscapedStringForContains(svc, true))+"%"), - ) + resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return + } + query += ` + AND ( + resource_fingerprint GLOBAL IN ` + + resourceSubQuery + + `) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket` + + args = append(args, + clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), + clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), + ) + + err = r.db.QueryRow( + ctx, + query, + args..., + ).ScanStruct(&serviceItem) + + if serviceItem.NumCalls == 0 { + return } + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return + } + + errorQuery += ` + AND ( + resource_fingerprint GLOBAL IN ` + + resourceSubQuery + + `) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket` + + err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return + } + + serviceItem.ServiceName = svc + serviceItem.NumErrors = numErrors + mtx.Lock() + serviceItems = append(serviceItems, serviceItem) + mtx.Unlock() + }(svc, ops) + } + wg.Wait() + + for idx := range serviceItems { + serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period) + serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls) + } + return &serviceItems, nil +} + +func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) { + if r.useTraceNewSchema { + return r.GetServicesV2(ctx, queryParams, skipConfig) + } + + if r.indexTable == "" { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} + } + + topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End, nil) + if apiErr != nil { + return nil, apiErr + } + + serviceItems := []model.ServiceItem{} + var wg sync.WaitGroup + // limit the number of concurrent queries to not overload the clickhouse server + sem := make(chan struct{}, 10) + var mtx sync.RWMutex + + for svc, ops := range *topLevelOps { + sem <- struct{}{} + wg.Add(1) + go func(svc string, ops []string) { + defer wg.Done() + defer func() { <-sem }() + var serviceItem model.ServiceItem + var numErrors uint64 + + // Even if the total number of operations within the time range is less and the all + // the top level operations are high, we want to warn to let user know the issue + // with the instrumentation + serviceItem.DataWarning = model.DataWarning{ + TopLevelOps: (*topLevelOps)[svc], + } + + // default max_query_size = 262144 + // Let's assume the average size of the item in `ops` is 50 bytes + // We can have 262144/50 = 5242 items in the `ops` array + // Although we have make it as big as 5k, We cap the number of items + // in the `ops` array to 1500 + + ops = ops[:int(math.Min(1500, float64(len(ops))))] + + query := fmt.Sprintf( + `SELECT + quantile(0.99)(durationNano) as p99, + avg(durationNano) as avgDuration, + count(*) as numCalls + FROM %s.%s + WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, + r.TraceDB, r.indexTable, + ) + errorQuery := fmt.Sprintf( + `SELECT + count(*) as numErrors + FROM %s.%s + WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, + r.TraceDB, r.indexTable, + ) + + args := []interface{}{} + args = append(args, + clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), + clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)), + clickhouse.Named("serviceName", svc), + clickhouse.Named("names", ops), + ) // create TagQuery from TagQueryParams tags := createTagQueryFromTagQueryParams(queryParams.Tags) subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) @@ -882,12 +1058,14 @@ func excludeTags(_ context.Context, tags []string) []string { return newTags } -func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { +func (r *ClickHouseReader) GetTopOperationsV2(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { namedArgs := []interface{}{ clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)), clickhouse.Named("serviceName", queryParams.ServiceName), + clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), + clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), } var topOperationsItems []model.TopOperationsItem @@ -905,16 +1083,62 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo r.TraceDB, r.traceTableName, ) + resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, queryParams.ServiceName, *queryParams.Start, *queryParams.End) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} + } + query += ` + AND ( + resource_fingerprint GLOBAL IN ` + + resourceSubQuery + + `) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket` + + query += " GROUP BY name ORDER BY p99 DESC" + if queryParams.Limit > 0 { + query += " LIMIT @limit" + namedArgs = append(namedArgs, clickhouse.Named("limit", queryParams.Limit)) + } + err = r.db.Select(ctx, &topOperationsItems, query, namedArgs...) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} + } + + if topOperationsItems == nil { + topOperationsItems = []model.TopOperationsItem{} + } + + return &topOperationsItems, nil +} + +func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { + if r.useTraceNewSchema { - resourceBucketFilter := fmt.Sprintf(constants.TraceResourceBucketFilterWithServiceName, r.TraceDB, r.traceResourceTableV3) - query += resourceBucketFilter - namedArgs = append(namedArgs, - clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), - clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), - clickhouse.Named("labelFilter", "%service.name%"+strings.ToLower(utils.QuoteEscapedStringForContains(queryParams.ServiceName, true))+"%"), - ) + return r.GetTopOperationsV2(ctx, queryParams) } + namedArgs := []interface{}{ + clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), + clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)), + clickhouse.Named("serviceName", queryParams.ServiceName), + } + + var topOperationsItems []model.TopOperationsItem + + query := fmt.Sprintf(` + SELECT + quantile(0.5)(durationNano) as p50, + quantile(0.95)(durationNano) as p95, + quantile(0.99)(durationNano) as p99, + COUNT(*) as numCalls, + countIf(statusCode=2) as errorCount, + name + FROM %s.%s + WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`, + r.TraceDB, r.indexTable, + ) args := []interface{}{} args = append(args, namedArgs...) // create TagQuery from TagQueryParams @@ -943,7 +1167,6 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo return &topOperationsItems, nil } - func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) { var usageItems []model.UsageItem diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index b66f60dfa9..7072f13173 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -718,17 +718,3 @@ func init() { } const TRACE_V4_MAX_PAGINATION_LIMIT = 10000 - -const TraceResourceBucketFilterWithServiceName = ` - AND ( - resource_fingerprint GLOBAL IN - ( - SELECT fingerprint FROM %s.%s - WHERE - seen_at_ts_bucket_start >= @start_bucket AND seen_at_ts_bucket_start <= @end_bucket AND - simpleJSONExtractString(labels, 'service.name') = @serviceName AND - labels like @labelFilter - ) - ) - AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket - ` diff --git a/pkg/query-service/utils/format.go b/pkg/query-service/utils/format.go index 963abac219..3533f803a5 100644 --- a/pkg/query-service/utils/format.go +++ b/pkg/query-service/utils/format.go @@ -41,6 +41,8 @@ func ValidateAndCastValue(v interface{}, dataType v3.AttributeKeyDataType) (inte } } return x, nil + case []string: + return x, nil default: return nil, fmt.Errorf("invalid data type, expected string, got %v", reflect.TypeOf(v)) }