Skip to content

Commit

Permalink
fix: add v2 for getServices and GetTopOperations (#6516)
Browse files Browse the repository at this point in the history
* fix: add v2 for getServices and GetTopOperations

* fix: add comments

* fix: update logic for filters

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
  • Loading branch information
nityanandagohain and srikanthccv authored Nov 22, 2024
1 parent 20f748f commit afbba1e
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 33 deletions.
261 changes: 242 additions & 19 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

queryprogress "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader/query_progress"
"go.signoz.io/signoz/pkg/query-service/app/logs"
"go.signoz.io/signoz/pkg/query-service/app/resource"
"go.signoz.io/signoz/pkg/query-service/app/services"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/common"
Expand Down Expand Up @@ -552,7 +553,66 @@ func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig
return &operations, nil
}

func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) {
func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc string, start, end time.Time) (string, error) {
// assuming all will be resource attributes.
// and resource attributes are string for traces
filterSet := v3.FilterSet{}
for _, tag := range tags {
// skip the collector id as we don't add it to traces
if tag.Key == "signoz.collector.id" {
continue
}
key := v3.AttributeKey{
Key: tag.Key,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
}

it := v3.FilterItem{
Key: key,
}

// as of now only in and not in are supported
switch tag.Operator {
case model.NotInOperator:
it.Operator = v3.FilterOperatorNotIn
it.Value = tag.StringValues
case model.InOperator:
it.Operator = v3.FilterOperatorIn
it.Value = tag.StringValues
default:
return "", fmt.Errorf("operator %s not supported", tag.Operator)
}

filterSet.Items = append(filterSet.Items, it)
}
filterSet.Items = append(filterSet.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "service.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: v3.FilterOperatorEqual,
Value: svc,
})

resourceSubQuery, err := resource.BuildResourceSubQuery(
r.TraceDB,
r.traceResourceTableV3,
start.Unix()-1800,
end.Unix(),
&filterSet,
[]v3.AttributeKey{},
v3.AttributeKey{},
false)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return "", err
}
return resourceSubQuery, nil
}

func (r *ClickHouseReader) GetServicesV2(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) {

if r.indexTable == "" {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
Expand Down Expand Up @@ -618,17 +678,133 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
clickhouse.Named("names", ops),
)

if r.useTraceNewSchema {
resourceBucketFilter := fmt.Sprintf(constants.TraceResourceBucketFilterWithServiceName, r.TraceDB, r.traceResourceTableV3)
query += resourceBucketFilter
errorQuery += resourceBucketFilter
args = append(args,
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
clickhouse.Named("labelFilter", "%service.name%"+strings.ToLower(utils.QuoteEscapedStringForContains(svc, true))+"%"),
)
resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return
}
query += `
AND (
resource_fingerprint GLOBAL IN ` +
resourceSubQuery +
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`

args = append(args,
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
)

err = r.db.QueryRow(
ctx,
query,
args...,
).ScanStruct(&serviceItem)

if serviceItem.NumCalls == 0 {
return
}

if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return
}

errorQuery += `
AND (
resource_fingerprint GLOBAL IN ` +
resourceSubQuery +
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`

err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return
}

serviceItem.ServiceName = svc
serviceItem.NumErrors = numErrors
mtx.Lock()
serviceItems = append(serviceItems, serviceItem)
mtx.Unlock()
}(svc, ops)
}
wg.Wait()

for idx := range serviceItems {
serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period)
serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls)
}
return &serviceItems, nil
}

func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) {
if r.useTraceNewSchema {
return r.GetServicesV2(ctx, queryParams, skipConfig)
}

if r.indexTable == "" {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
}

topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End, nil)
if apiErr != nil {
return nil, apiErr
}

serviceItems := []model.ServiceItem{}
var wg sync.WaitGroup
// limit the number of concurrent queries to not overload the clickhouse server
sem := make(chan struct{}, 10)
var mtx sync.RWMutex

for svc, ops := range *topLevelOps {
sem <- struct{}{}
wg.Add(1)
go func(svc string, ops []string) {
defer wg.Done()
defer func() { <-sem }()
var serviceItem model.ServiceItem
var numErrors uint64

// Even if the total number of operations within the time range is less and the all
// the top level operations are high, we want to warn to let user know the issue
// with the instrumentation
serviceItem.DataWarning = model.DataWarning{
TopLevelOps: (*topLevelOps)[svc],
}

// default max_query_size = 262144
// Let's assume the average size of the item in `ops` is 50 bytes
// We can have 262144/50 = 5242 items in the `ops` array
// Although we have make it as big as 5k, We cap the number of items
// in the `ops` array to 1500

ops = ops[:int(math.Min(1500, float64(len(ops))))]

query := fmt.Sprintf(
`SELECT
quantile(0.99)(durationNano) as p99,
avg(durationNano) as avgDuration,
count(*) as numCalls
FROM %s.%s
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
r.TraceDB, r.indexTable,
)
errorQuery := fmt.Sprintf(
`SELECT
count(*) as numErrors
FROM %s.%s
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
r.TraceDB, r.indexTable,
)

args := []interface{}{}
args = append(args,
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
clickhouse.Named("serviceName", svc),
clickhouse.Named("names", ops),
)
// create TagQuery from TagQueryParams
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
Expand Down Expand Up @@ -882,12 +1058,14 @@ func excludeTags(_ context.Context, tags []string) []string {
return newTags
}

func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {
func (r *ClickHouseReader) GetTopOperationsV2(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {

namedArgs := []interface{}{
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
clickhouse.Named("serviceName", queryParams.ServiceName),
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
}

var topOperationsItems []model.TopOperationsItem
Expand All @@ -905,16 +1083,62 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
r.TraceDB, r.traceTableName,
)

resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, queryParams.ServiceName, *queryParams.Start, *queryParams.End)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
}
query += `
AND (
resource_fingerprint GLOBAL IN ` +
resourceSubQuery +
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`

query += " GROUP BY name ORDER BY p99 DESC"
if queryParams.Limit > 0 {
query += " LIMIT @limit"
namedArgs = append(namedArgs, clickhouse.Named("limit", queryParams.Limit))
}
err = r.db.Select(ctx, &topOperationsItems, query, namedArgs...)

if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
}

if topOperationsItems == nil {
topOperationsItems = []model.TopOperationsItem{}
}

return &topOperationsItems, nil
}

func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {

if r.useTraceNewSchema {
resourceBucketFilter := fmt.Sprintf(constants.TraceResourceBucketFilterWithServiceName, r.TraceDB, r.traceResourceTableV3)
query += resourceBucketFilter
namedArgs = append(namedArgs,
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
clickhouse.Named("labelFilter", "%service.name%"+strings.ToLower(utils.QuoteEscapedStringForContains(queryParams.ServiceName, true))+"%"),
)
return r.GetTopOperationsV2(ctx, queryParams)
}

namedArgs := []interface{}{
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
clickhouse.Named("serviceName", queryParams.ServiceName),
}

var topOperationsItems []model.TopOperationsItem

query := fmt.Sprintf(`
SELECT
quantile(0.5)(durationNano) as p50,
quantile(0.95)(durationNano) as p95,
quantile(0.99)(durationNano) as p99,
COUNT(*) as numCalls,
countIf(statusCode=2) as errorCount,
name
FROM %s.%s
WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`,
r.TraceDB, r.indexTable,
)
args := []interface{}{}
args = append(args, namedArgs...)
// create TagQuery from TagQueryParams
Expand Down Expand Up @@ -943,7 +1167,6 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo

return &topOperationsItems, nil
}

func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) {

var usageItems []model.UsageItem
Expand Down
14 changes: 0 additions & 14 deletions pkg/query-service/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,17 +718,3 @@ func init() {
}

const TRACE_V4_MAX_PAGINATION_LIMIT = 10000

const TraceResourceBucketFilterWithServiceName = `
AND (
resource_fingerprint GLOBAL IN
(
SELECT fingerprint FROM %s.%s
WHERE
seen_at_ts_bucket_start >= @start_bucket AND seen_at_ts_bucket_start <= @end_bucket AND
simpleJSONExtractString(labels, 'service.name') = @serviceName AND
labels like @labelFilter
)
)
AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket
`
2 changes: 2 additions & 0 deletions pkg/query-service/utils/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func ValidateAndCastValue(v interface{}, dataType v3.AttributeKeyDataType) (inte
}
}
return x, nil
case []string:
return x, nil
default:
return nil, fmt.Errorf("invalid data type, expected string, got %v", reflect.TypeOf(v))
}
Expand Down

0 comments on commit afbba1e

Please sign in to comment.