diff --git a/pkg/query-service/app/logs/v3/json_filter.go b/pkg/query-service/app/logs/v3/json_filter.go index 887baaab4c..d883b61797 100644 --- a/pkg/query-service/app/logs/v3/json_filter.go +++ b/pkg/query-service/app/logs/v3/json_filter.go @@ -20,7 +20,7 @@ const ( NGRAM_SIZE = 4 ) -var dataTypeMapping = map[string]string{ +var DataTypeMapping = map[string]string{ "string": STRING, "int64": INT64, "float64": FLOAT64, @@ -31,7 +31,7 @@ var dataTypeMapping = map[string]string{ "array(bool)": ARRAY_BOOL, } -var arrayValueTypeMapping = map[string]string{ +var ArrayValueTypeMapping = map[string]string{ "array(string)": "string", "array(int64)": "int64", "array(float64)": "float64", @@ -59,7 +59,7 @@ var jsonLogOperators = map[v3.FilterOperator]string{ v3.FilterOperatorNotHas: "NOT has(%s, %s)", } -func getPath(keyArr []string) string { +func GetPath(keyArr []string) string { path := []string{} for i := 0; i < len(keyArr); i++ { if strings.HasSuffix(keyArr[i], "[*]") { @@ -71,7 +71,7 @@ func getPath(keyArr []string) string { return strings.Join(path, ".") } -func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (string, error) { +func GetJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (string, error) { keyArr := strings.Split(key.Key, ".") // i.e it should be at least body.name, and not something like body if len(keyArr) < 2 { @@ -89,11 +89,11 @@ func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) ( var dataType string var ok bool - if dataType, ok = dataTypeMapping[string(key.DataType)]; !ok { + if dataType, ok = DataTypeMapping[string(key.DataType)]; !ok { return "", fmt.Errorf("unsupported dataType for JSON: %s", key.DataType) } - path := getPath(keyArr[1:]) + path := GetPath(keyArr[1:]) if isArray { return fmt.Sprintf("JSONExtract(JSON_QUERY(%s, '$.%s'), '%s')", keyArr[0], path, dataType), nil @@ -109,7 +109,7 @@ func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) ( } // takes the path and the values and generates where clauses for better usage of index -func getPathIndexFilter(path string) string { +func GetPathIndexFilter(path string) string { filters := []string{} keyArr := strings.Split(path, ".") if len(keyArr) < 2 { @@ -136,7 +136,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { dataType := item.Key.DataType isArray := false // check if its an array and handle it - if val, ok := arrayValueTypeMapping[string(item.Key.DataType)]; ok { + if val, ok := ArrayValueTypeMapping[string(item.Key.DataType)]; ok { if item.Operator != v3.FilterOperatorHas && item.Operator != v3.FilterOperatorNotHas { return "", fmt.Errorf("only has operator is supported for array") } @@ -144,7 +144,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { dataType = v3.AttributeKeyDataType(val) } - key, err := getJSONFilterKey(item.Key, item.Operator, isArray) + key, err := GetJSONFilterKey(item.Key, item.Operator, isArray) if err != nil { return "", err } @@ -164,7 +164,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { if logsOp, ok := jsonLogOperators[op]; ok { switch op { case v3.FilterOperatorExists, v3.FilterOperatorNotExists: - filter = fmt.Sprintf(logsOp, key, getPath(strings.Split(item.Key.Key, ".")[1:])) + filter = fmt.Sprintf(logsOp, key, GetPath(strings.Split(item.Key.Key, ".")[1:])) case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas: fmtVal := utils.ClickHouseFormattedValue(value) filter = fmt.Sprintf(logsOp, key, fmtVal) @@ -181,7 +181,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { filters := []string{} - pathFilter := getPathIndexFilter(item.Key.Key) + pathFilter := GetPathIndexFilter(item.Key.Key) if pathFilter != "" { filters = append(filters, pathFilter) } @@ -196,7 +196,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { // add exists check for non array items as default values of int/float/bool will corrupt the results if !isArray && !(item.Operator == v3.FilterOperatorExists || item.Operator == v3.FilterOperatorNotExists) { - existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", getPath(strings.Split(item.Key.Key, ".")[1:])) + existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", GetPath(strings.Split(item.Key.Key, ".")[1:])) filter = fmt.Sprintf("%s AND %s", existsFilter, filter) } diff --git a/pkg/query-service/app/logs/v3/json_filter_test.go b/pkg/query-service/app/logs/v3/json_filter_test.go index 0a71cd67b2..060ba63707 100644 --- a/pkg/query-service/app/logs/v3/json_filter_test.go +++ b/pkg/query-service/app/logs/v3/json_filter_test.go @@ -140,7 +140,7 @@ var testGetJSONFilterKeyData = []struct { func TestGetJSONFilterKey(t *testing.T) { for _, tt := range testGetJSONFilterKeyData { Convey("testgetKey", t, func() { - columnName, err := getJSONFilterKey(tt.Key, tt.Operator, tt.IsArray) + columnName, err := GetJSONFilterKey(tt.Key, tt.Operator, tt.IsArray) if tt.Error { So(err, ShouldNotBeNil) } else { diff --git a/pkg/query-service/app/logs/v3/query_builder.go b/pkg/query-service/app/logs/v3/query_builder.go index 2aa56002ff..bd64b4d0e6 100644 --- a/pkg/query-service/app/logs/v3/query_builder.go +++ b/pkg/query-service/app/logs/v3/query_builder.go @@ -9,7 +9,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/utils" ) -var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ +var AggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ v3.AggregateOperatorP05: 0.05, v3.AggregateOperatorP10: 0.10, v3.AggregateOperatorP20: 0.20, @@ -21,7 +21,7 @@ var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ v3.AggregateOperatorP99: 0.99, } -var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ +var AggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ v3.AggregateOperatorAvg: "avg", v3.AggregateOperatorMax: "max", v3.AggregateOperatorMin: "min", @@ -53,7 +53,7 @@ var logOperators = map[v3.FilterOperator]string{ const BODY = "body" -func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string { +func GetClickhouseLogsColumnType(columnType v3.AttributeKeyType) string { if columnType == v3.AttributeKeyTypeTag { return "attributes" } @@ -83,7 +83,7 @@ func getClickhouseColumnName(key v3.AttributeKey) string { //if the key is present in the topLevelColumn then it will be only searched in those columns, //regardless if it is indexed/present again in resource or column attribute if !key.IsColumn { - columnType := getClickhouseLogsColumnType(key.Type) + columnType := GetClickhouseLogsColumnType(key.Type) columnDataType := getClickhouseLogsColumnDataType(key.DataType) clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key) return clickhouseColumn @@ -114,7 +114,7 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri return selectLabels } -func getSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string { +func GetSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string { var selectLabels []string if aggregatorOperator == v3.AggregateOperatorNoOp { return "" @@ -154,7 +154,7 @@ func GetExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string { } return fmt.Sprintf("%s_exists`=%v", strings.TrimSuffix(getClickhouseColumnName(item.Key), "`"), val) } - columnType := getClickhouseLogsColumnType(item.Key.Type) + columnType := GetClickhouseLogsColumnType(item.Key.Type) columnDataType := getClickhouseLogsColumnDataType(item.Key.DataType) return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key) } @@ -224,7 +224,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey, // add group by conditions to filter out log lines which doesn't have the key for _, attr := range groupBy { if !attr.IsColumn { - columnType := getClickhouseLogsColumnType(attr.Type) + columnType := GetClickhouseLogsColumnType(attr.Type) columnDataType := getClickhouseLogsColumnDataType(attr.DataType) conditions = append(conditions, fmt.Sprintf("has(%s_%s_key, '%s')", columnType, columnDataType, attr.Key)) } else if attr.Type != v3.AttributeKeyTypeUnspecified { @@ -258,7 +258,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy) - having := having(mq.Having) + having := Having(mq.Having) if having != "" { having = " having " + having } @@ -288,10 +288,10 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build // we dont need value for first query // going with this route as for a cleaner approach on implementation if graphLimitQtype == constants.FirstQueryGraphLimit { - queryTmpl = "SELECT " + getSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")" + queryTmpl = "SELECT " + GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")" } - groupBy := groupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...) + groupBy := GroupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...) if panelType != v3.PanelTypeList && groupBy != "" { groupBy = " group by " + groupBy } @@ -301,7 +301,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build } if graphLimitQtype == constants.SecondQueryGraphLimit { - filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", getSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)" + filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)" } aggregationKey := "" @@ -329,7 +329,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build rate = rate / 60.0 } - op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate) + op := fmt.Sprintf("%s(%s)/%f", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case @@ -342,11 +342,11 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build v3.AggregateOperatorP90, v3.AggregateOperatorP95, v3.AggregateOperatorP99: - op := fmt.Sprintf("quantile(%v)(%s)", aggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey) + op := fmt.Sprintf("quantile(%v)(%s)", AggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: - op := fmt.Sprintf("%s(%s)", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey) + op := fmt.Sprintf("%s(%s)", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case v3.AggregateOperatorCount: @@ -394,7 +394,7 @@ func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) str return strings.Join(tags, ",") } -func groupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string { +func GroupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string { groupTags := []string{} for _, tag := range tags { groupTags = append(groupTags, "`"+tag.Key+"`") @@ -446,7 +446,7 @@ func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags [] return str } -func having(items []v3.Having) string { +func Having(items []v3.Having) string { // aggregate something and filter on that aggregate var having []string for _, item := range items { @@ -455,7 +455,7 @@ func having(items []v3.Having) string { return strings.Join(having, " AND ") } -func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) { +func ReduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) { // the timestamp picked is not relevant here since the final value used is show the single // chart with just the query value. switch reduceTo { @@ -475,14 +475,14 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v return query, nil } -func addLimitToQuery(query string, limit uint64) string { +func AddLimitToQuery(query string, limit uint64) string { if limit == 0 { return query } return fmt.Sprintf("%s LIMIT %d", query, limit) } -func addOffsetToQuery(query string, offset uint64) string { +func AddOffsetToQuery(query string, offset uint64) string { return fmt.Sprintf("%s OFFSET %d", query, offset) } @@ -492,7 +492,7 @@ type Options struct { PreferRPM bool } -func isOrderByTs(orderBy []v3.OrderBy) bool { +func IsOrderByTs(orderBy []v3.OrderBy) bool { if len(orderBy) == 1 && (orderBy[0].Key == constants.TIMESTAMP || orderBy[0].ColumnName == constants.TIMESTAMP) { return true } @@ -523,7 +523,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan if err != nil { return "", err } - query = addLimitToQuery(query, mq.Limit) + query = AddLimitToQuery(query, mq.Limit) return query, nil } else if options.GraphLimitQtype == constants.SecondQueryGraphLimit { @@ -539,7 +539,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan return "", err } if panelType == v3.PanelTypeValue { - query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator) + query, err = ReduceQuery(query, mq.ReduceTo, mq.AggregateOperator) } if panelType == v3.PanelTypeList { @@ -550,21 +550,21 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan if mq.PageSize > 0 { if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit { - query = addLimitToQuery(query, mq.Limit-mq.Offset) + query = AddLimitToQuery(query, mq.Limit-mq.Offset) } else { - query = addLimitToQuery(query, mq.PageSize) + query = AddLimitToQuery(query, mq.PageSize) } // add offset to the query only if it is not orderd by timestamp. - if !isOrderByTs(mq.OrderBy) { - query = addOffsetToQuery(query, mq.Offset) + if !IsOrderByTs(mq.OrderBy) { + query = AddOffsetToQuery(query, mq.Offset) } } else { - query = addLimitToQuery(query, mq.Limit) + query = AddLimitToQuery(query, mq.Limit) } } else if panelType == v3.PanelTypeTable { - query = addLimitToQuery(query, mq.Limit) + query = AddLimitToQuery(query, mq.Limit) } return query, err diff --git a/pkg/query-service/app/logs/v4/json_filter.go b/pkg/query-service/app/logs/v4/json_filter.go new file mode 100644 index 0000000000..cde88e748a --- /dev/null +++ b/pkg/query-service/app/logs/v4/json_filter.go @@ -0,0 +1,105 @@ +package v4 + +import ( + "fmt" + "strings" + + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +var jsonLogOperators = map[v3.FilterOperator]string{ + v3.FilterOperatorEqual: "=", + v3.FilterOperatorNotEqual: "!=", + v3.FilterOperatorLessThan: "<", + v3.FilterOperatorLessThanOrEq: "<=", + v3.FilterOperatorGreaterThan: ">", + v3.FilterOperatorGreaterThanOrEq: ">=", + v3.FilterOperatorLike: "LIKE", + v3.FilterOperatorNotLike: "NOT LIKE", + v3.FilterOperatorContains: "LIKE", + v3.FilterOperatorNotContains: "NOT LIKE", + v3.FilterOperatorRegex: "match(%s, %s)", + v3.FilterOperatorNotRegex: "NOT match(%s, %s)", + v3.FilterOperatorIn: "IN", + v3.FilterOperatorNotIn: "NOT IN", + v3.FilterOperatorExists: "JSON_EXISTS(%s, '$.%s')", + v3.FilterOperatorNotExists: "NOT JSON_EXISTS(%s, '$.%s')", + v3.FilterOperatorHas: "has(%s, %s)", + v3.FilterOperatorNotHas: "NOT has(%s, %s)", +} + +func GetJSONFilter(item v3.FilterItem) (string, error) { + + dataType := item.Key.DataType + isArray := false + // check if its an array and handle it + if val, ok := logsV3.ArrayValueTypeMapping[string(item.Key.DataType)]; ok { + if item.Operator != v3.FilterOperatorHas && item.Operator != v3.FilterOperatorNotHas { + return "", fmt.Errorf("only has operator is supported for array") + } + isArray = true + dataType = v3.AttributeKeyDataType(val) + } + + key, err := logsV3.GetJSONFilterKey(item.Key, item.Operator, isArray) + if err != nil { + return "", err + } + + // non array + op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) + + var value interface{} + if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists { + value, err = utils.ValidateAndCastValue(item.Value, dataType) + if err != nil { + return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err) + } + } + + var filter string + if logsOp, ok := jsonLogOperators[op]; ok { + switch op { + case v3.FilterOperatorExists, v3.FilterOperatorNotExists: + filter = fmt.Sprintf(logsOp, key, logsV3.GetPath(strings.Split(item.Key.Key, ".")[1:])) + case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas: + fmtVal := utils.ClickHouseFormattedValue(value) + filter = fmt.Sprintf(logsOp, key, fmtVal) + case v3.FilterOperatorContains, v3.FilterOperatorNotContains: + val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value)) + filter = fmt.Sprintf("%s %s '%%%s%%'", key, logsOp, val) + default: + fmtVal := utils.ClickHouseFormattedValue(value) + filter = fmt.Sprintf("%s %s %s", key, logsOp, fmtVal) + } + } else { + return "", fmt.Errorf("unsupported operator: %s", op) + } + + filters := []string{} + + pathFilter := logsV3.GetPathIndexFilter(item.Key.Key) + if pathFilter != "" { + filters = append(filters, pathFilter) + } + if op == v3.FilterOperatorContains || + op == v3.FilterOperatorEqual || + op == v3.FilterOperatorHas { + val, ok := item.Value.(string) + if ok && len(val) >= logsV3.NGRAM_SIZE { + filters = append(filters, fmt.Sprintf("lower(body) like lower('%%%s%%')", utils.QuoteEscapedString(strings.ToLower(val)))) + } + } + + // add exists check for non array items as default values of int/float/bool will corrupt the results + if !isArray && !(item.Operator == v3.FilterOperatorExists || item.Operator == v3.FilterOperatorNotExists) { + existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", logsV3.GetPath(strings.Split(item.Key.Key, ".")[1:])) + filter = fmt.Sprintf("%s AND %s", existsFilter, filter) + } + + filters = append(filters, filter) + + return strings.Join(filters, " AND "), nil +} diff --git a/pkg/query-service/app/logs/v4/json_filter_test.go b/pkg/query-service/app/logs/v4/json_filter_test.go new file mode 100644 index 0000000000..c8b2e44847 --- /dev/null +++ b/pkg/query-service/app/logs/v4/json_filter_test.go @@ -0,0 +1,200 @@ +package v4 + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +var testGetJSONFilterData = []struct { + Name string + FilterItem v3.FilterItem + Filter string + Error bool +}{ + { + Name: "Array membership string", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.requestor_list[*]", + DataType: "array(string)", + IsJSON: true, + }, + Operator: "has", + Value: "index_service", + }, + Filter: "lower(body) like lower('%requestor_list%') AND lower(body) like lower('%index_service%') AND has(JSONExtract(JSON_QUERY(body, '$.\"requestor_list\"[*]'), 'Array(String)'), 'index_service')", + }, + { + Name: "Array membership int64", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.int_numbers[*]", + DataType: "array(int64)", + IsJSON: true, + }, + Operator: "has", + Value: 2, + }, + Filter: "lower(body) like lower('%int_numbers%') AND has(JSONExtract(JSON_QUERY(body, '$.\"int_numbers\"[*]'), '" + logsV3.ARRAY_INT64 + "'), 2)", + }, + { + Name: "Array membership float64", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.nested_num[*].float_nums[*]", + DataType: "array(float64)", + IsJSON: true, + }, + Operator: "nhas", + Value: 2.2, + }, + Filter: "lower(body) like lower('%nested_num%float_nums%') AND NOT has(JSONExtract(JSON_QUERY(body, '$.\"nested_num\"[*].\"float_nums\"[*]'), '" + logsV3.ARRAY_FLOAT64 + "'), 2.200000)", + }, + { + Name: "Array membership bool", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.bool[*]", + DataType: "array(bool)", + IsJSON: true, + }, + Operator: "has", + Value: true, + }, + Filter: "lower(body) like lower('%bool%') AND has(JSONExtract(JSON_QUERY(body, '$.\"bool\"[*]'), '" + logsV3.ARRAY_BOOL + "'), true)", + }, + { + Name: "eq operator", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "=", + Value: "hello", + }, + Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') = 'hello'", + }, + { + Name: "eq operator number", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.status", + DataType: "int64", + IsJSON: true, + }, + Operator: "=", + Value: 1, + }, + Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.INT64 + "') = 1", + }, + { + Name: "neq operator number", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.status", + DataType: "float64", + IsJSON: true, + }, + Operator: "=", + Value: 1.1, + }, + Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.FLOAT64 + "') = 1.100000", + }, + { + Name: "eq operator bool", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.boolkey", + DataType: "bool", + IsJSON: true, + }, + Operator: "=", + Value: true, + }, + Filter: "lower(body) like lower('%boolkey%') AND JSON_EXISTS(body, '$.\"boolkey\"') AND JSONExtract(JSON_VALUE(body, '$.\"boolkey\"'), '" + logsV3.BOOL + "') = true", + }, + { + Name: "greater than operator", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.status", + DataType: "int64", + IsJSON: true, + }, + Operator: ">", + Value: 1, + }, + Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.INT64 + "') > 1", + }, + { + Name: "regex operator", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "regex", + Value: "a*", + }, + Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND match(JSON_VALUE(body, '$.\"message\"'), 'a*')", + }, + { + Name: "contains operator", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "contains", + Value: "a", + }, + Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%a%'", + }, + { + Name: "contains operator with quotes", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "contains", + Value: "hello 'world'", + }, + Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello \\'world\\'%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%hello \\'world\\'%'", + }, + { + Name: "exists", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "exists", + Value: "", + }, + Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"')", + }, +} + +func TestGetJSONFilter(t *testing.T) { + for _, tt := range testGetJSONFilterData { + Convey("testGetJSONFilter", t, func() { + filter, err := GetJSONFilter(tt.FilterItem) + if tt.Error { + So(err, ShouldNotBeNil) + } else { + So(err, ShouldBeNil) + So(filter, ShouldEqual, tt.Filter) + } + }) + } +} diff --git a/pkg/query-service/app/logs/v4/query_builder.go b/pkg/query-service/app/logs/v4/query_builder.go index 08024756bd..b96c5b9113 100644 --- a/pkg/query-service/app/logs/v4/query_builder.go +++ b/pkg/query-service/app/logs/v4/query_builder.go @@ -1,7 +1,13 @@ package v4 import ( + "fmt" + "strings" + + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" ) var logOperators = map[v3.FilterOperator]string{ @@ -29,3 +35,504 @@ const ( DISTRIBUTED_LOGS_V2_RESOURCE = "distributed_logs_v2_resource" NANOSECOND = 1000000000 ) + +func getClickhouseLogsColumnDataType(columnDataType v3.AttributeKeyDataType) string { + if columnDataType == v3.AttributeKeyDataTypeFloat64 || columnDataType == v3.AttributeKeyDataTypeInt64 { + return "number" + } + if columnDataType == v3.AttributeKeyDataTypeBool { + return "bool" + } + return "string" +} + +func getClickhouseKey(key v3.AttributeKey) string { + // check if it is a top level static field + if _, ok := constants.StaticFieldsLogsV3[key.Key]; ok && key.Type == v3.AttributeKeyTypeUnspecified { + return key.Key + } + + //if the key is present in the topLevelColumn then it will be only searched in those columns, + //regardless if it is indexed/present again in resource or column attribute + if !key.IsColumn { + columnType := logsV3.GetClickhouseLogsColumnType(key.Type) + columnDataType := getClickhouseLogsColumnDataType(key.DataType) + return fmt.Sprintf("%s_%s['%s']", columnType, columnDataType, key.Key) + } + + // materialized column created from query + // https://github.com/SigNoz/signoz/pull/4775 + return "`" + utils.GetClickhouseColumnNameV2(string(key.Type), string(key.DataType), key.Key) + "`" +} + +func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string { + var selectLabels string + if aggregatorOperator == v3.AggregateOperatorNoOp { + selectLabels = "" + } else { + for _, tag := range groupBy { + columnName := getClickhouseKey(tag) + selectLabels += fmt.Sprintf(" %s as `%s`,", columnName, tag.Key) + } + } + return selectLabels +} + +func getExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string { + if _, ok := constants.StaticFieldsLogsV3[item.Key.Key]; ok && item.Key.Type == v3.AttributeKeyTypeUnspecified { + // no exists filter for static fields as they exists everywhere + // TODO(nitya): Think what we can do here + return "" + } else if item.Key.IsColumn { + // get filter for materialized columns + val := true + if op == v3.FilterOperatorNotExists { + val = false + } + return fmt.Sprintf("%s_exists`=%v", strings.TrimSuffix(getClickhouseKey(item.Key), "`"), val) + } + // filter for non materialized attributes + columnType := logsV3.GetClickhouseLogsColumnType(item.Key.Type) + columnDataType := getClickhouseLogsColumnDataType(item.Key.DataType) + return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key) +} + +func buildAttributeFilter(item v3.FilterItem) (string, error) { + // check if the user is searching for value in all attributes + key := item.Key.Key + op := v3.FilterOperator(strings.ToLower(string(item.Operator))) + + var value interface{} + var err error + if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists { + value, err = utils.ValidateAndCastValue(item.Value, item.Key.DataType) + if err != nil { + return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err) + } + } + + // TODO(nitya): as of now __attrs is only supports attributes_string. Discuss more on this + // also for eq and contains as now it does a exact match + if key == "__attrs" { + if (op != v3.FilterOperatorEqual && op != v3.FilterOperatorContains) || item.Key.DataType != v3.AttributeKeyDataTypeString { + return "", fmt.Errorf("only = operator and string data type is supported for __attrs") + } + val := utils.ClickHouseFormattedValue(item.Value) + return fmt.Sprintf("has(mapValues(attributes_string), %s)", val), nil + } + + keyName := getClickhouseKey(item.Key) + fmtVal := utils.ClickHouseFormattedValue(value) + + if logsOp, ok := logOperators[op]; ok { + switch op { + case v3.FilterOperatorExists, v3.FilterOperatorNotExists: + return getExistsNexistsFilter(op, item), nil + case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex: + + return fmt.Sprintf(logsOp, keyName, fmtVal), nil + case v3.FilterOperatorContains, v3.FilterOperatorNotContains: + val := utils.QuoteEscapedStringForContains(fmt.Sprintf("%s", item.Value)) + // for body the contains is case insensitive + if keyName == BODY { + return fmt.Sprintf("lower(%s) %s lower('%%%s%%')", keyName, logsOp, val), nil + } else { + return fmt.Sprintf("%s %s '%%%s%%'", keyName, logsOp, val), nil + } + default: + // for use lower for like and ilike + if op == v3.FilterOperatorLike || op == v3.FilterOperatorNotLike { + if keyName == BODY { + keyName = fmt.Sprintf("lower(%s)", keyName) + fmtVal = fmt.Sprintf("lower(%s)", fmtVal) + } + } + return fmt.Sprintf("%s %s %s", keyName, logsOp, fmtVal), nil + } + } else { + return "", fmt.Errorf("unsupported operator: %s", op) + } +} + +func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey) (string, error) { + var conditions []string + + if fs == nil || len(fs.Items) == 0 { + return "", nil + } + + for _, item := range fs.Items { + // skip if it's a resource attribute + if item.Key.Type == v3.AttributeKeyTypeResource { + continue + } + + // if the filter is json filter + if item.Key.IsJSON { + filter, err := GetJSONFilter(item) + if err != nil { + return "", err + } + conditions = append(conditions, filter) + continue + } + + // generate the filter + filter, err := buildAttributeFilter(item) + if err != nil { + return "", err + } + conditions = append(conditions, filter) + + // add extra condition for map contains + // by default clickhouse is not able to utilize indexes for keys with all operators. + // mapContains forces the use of index. + op := v3.FilterOperator(strings.ToLower(string(item.Operator))) + if item.Key.IsColumn == false && op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists { + conditions = append(conditions, getExistsNexistsFilter(v3.FilterOperatorExists, item)) + } + } + + // add group by conditions to filter out log lines which doesn't have the key + for _, attr := range groupBy { + // skip if it's a resource attribute + if attr.Type == v3.AttributeKeyTypeResource { + continue + } + + if !attr.IsColumn { + columnType := logsV3.GetClickhouseLogsColumnType(attr.Type) + columnDataType := getClickhouseLogsColumnDataType(attr.DataType) + conditions = append(conditions, fmt.Sprintf("mapContains(%s_%s, '%s')", columnType, columnDataType, attr.Key)) + } else if attr.Type != v3.AttributeKeyTypeUnspecified { + // for materialzied columns and not the top level static fields + name := utils.GetClickhouseColumnNameV2(string(attr.Type), string(attr.DataType), attr.Key) + conditions = append(conditions, fmt.Sprintf("`%s_exists`=true", name)) + } + } + + // add conditions for aggregate attribute + if aggregateAttribute.Key != "" && aggregateAttribute.Type != v3.AttributeKeyTypeResource { + existsFilter := getExistsNexistsFilter(v3.FilterOperatorExists, v3.FilterItem{Key: aggregateAttribute}) + conditions = append(conditions, existsFilter) + } + + queryString := strings.Join(conditions, " AND ") + return queryString, nil +} + +// orderBy returns a string of comma separated tags for order by clause +// if there are remaining items which are not present in tags they are also added +// if the order is not specified, it defaults to ASC +func orderBy(panelType v3.PanelType, items []v3.OrderBy, tagLookup map[string]struct{}) []string { + var orderBy []string + + for _, item := range items { + if item.ColumnName == constants.SigNozOrderByValue { + orderBy = append(orderBy, fmt.Sprintf("value %s", item.Order)) + } else if _, ok := tagLookup[item.ColumnName]; ok { + orderBy = append(orderBy, fmt.Sprintf("`%s` %s", item.ColumnName, item.Order)) + } else if panelType == v3.PanelTypeList { + attr := v3.AttributeKey{Key: item.ColumnName, DataType: item.DataType, Type: item.Type, IsColumn: item.IsColumn} + name := getClickhouseKey(attr) + if item.IsColumn { + name = "`" + name + "`" + } + orderBy = append(orderBy, fmt.Sprintf("%s %s", name, item.Order)) + } + } + return orderBy +} + +func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []v3.AttributeKey) string { + + tagLookup := map[string]struct{}{} + for _, v := range tags { + tagLookup[v.Key] = struct{}{} + } + + orderByArray := orderBy(panelType, items, tagLookup) + + if len(orderByArray) == 0 { + if panelType == v3.PanelTypeList { + orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC") + } else { + orderByArray = append(orderByArray, "value DESC") + } + } + + str := strings.Join(orderByArray, ",") + return str +} + +func generateAggregateClause(aggOp v3.AggregateOperator, + aggKey string, + step int64, + preferRPM bool, + timeFilter string, + whereClause string, + groupBy string, + having string, + orderBy string, +) (string, error) { + queryTmpl := " %s as value from signoz_logs." + DISTRIBUTED_LOGS_V2 + + " where " + timeFilter + "%s" + + "%s%s" + + "%s" + switch aggOp { + case v3.AggregateOperatorRate: + rate := float64(step) + if preferRPM { + rate = rate / 60.0 + } + + op := fmt.Sprintf("count(%s)/%f", aggKey, rate) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + rate := float64(step) + if preferRPM { + rate = rate / 60.0 + } + + op := fmt.Sprintf("%s(%s)/%f", logsV3.AggregateOperatorToSQLFunc[aggOp], aggKey, rate) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(%s)", logsV3.AggregateOperatorToPercentile[aggOp], aggKey) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(%s)", logsV3.AggregateOperatorToSQLFunc[aggOp], aggKey) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := fmt.Sprintf("toFloat64(count(distinct(%s)))", aggKey) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} + +func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string, preferRPM bool) (string, error) { + // timerange will be sent in epoch millisecond + logsStart := utils.GetEpochNanoSecs(start) + logsEnd := utils.GetEpochNanoSecs(end) + + // -1800 this is added so that the bucket start considers all the fingerprints. + bucketStart := logsStart/NANOSECOND - 1800 + bucketEnd := logsEnd / NANOSECOND + + // timestamp filter , bucket_start filter is added for primary key + timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d) AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", logsStart, logsEnd, bucketStart, bucketEnd) + + // build the where clause for main table + filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute) + if err != nil { + return "", err + } + if filterSubQuery != "" { + filterSubQuery = " AND " + filterSubQuery + } + + // build the where clause for resource table + resourceSubQuery, err := buildResourceSubQuery(bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false) + if err != nil { + return "", err + } + // join both the filter clauses + if resourceSubQuery != "" { + filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + ")" + } + + // get the select labels + selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy) + + // get the order by clause + orderBy := orderByAttributeKeyTags(panelType, mq.OrderBy, mq.GroupBy) + if panelType != v3.PanelTypeList && orderBy != "" { + orderBy = " order by " + orderBy + } + + // if noop create the query and return + if mq.AggregateOperator == v3.AggregateOperatorNoOp { + // with noop any filter or different order by other than ts will use new table + sqlSelect := constants.LogsSQLSelectV2 + queryTmpl := sqlSelect + "from signoz_logs.%s where %s%s order by %s" + query := fmt.Sprintf(queryTmpl, DISTRIBUTED_LOGS_V2, timeFilter, filterSubQuery, orderBy) + return query, nil + // ---- NOOP ends here ---- + } + + // ---- FOR aggregation queries ---- + + // get the having conditions + having := logsV3.Having(mq.Having) + if having != "" { + having = " having " + having + } + + // get the group by clause + groupBy := logsV3.GroupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...) + if panelType != v3.PanelTypeList && groupBy != "" { + groupBy = " group by " + groupBy + } + + // get the aggregation key + aggregationKey := "" + if mq.AggregateAttribute.Key != "" { + aggregationKey = getClickhouseKey(mq.AggregateAttribute) + } + + // for limit queries, there are two queries formed + // in the second query we need to add the placeholder so that first query can be placed + if graphLimitQtype == constants.SecondQueryGraphLimit { + filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", logsV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)" + } + + aggClause, err := generateAggregateClause(mq.AggregateOperator, aggregationKey, step, preferRPM, timeFilter, filterSubQuery, groupBy, having, orderBy) + if err != nil { + return "", err + } + + var queryTmplPrefix string + if graphLimitQtype == constants.FirstQueryGraphLimit { + queryTmplPrefix = "SELECT" + } else if panelType == v3.PanelTypeTable { + queryTmplPrefix = + "SELECT" + // step or aggregate interval is whole time period in case of table panel + step = (utils.GetEpochNanoSecs(end) - utils.GetEpochNanoSecs(start)) / NANOSECOND + } else if panelType == v3.PanelTypeGraph || panelType == v3.PanelTypeValue { + // Select the aggregate value for interval + queryTmplPrefix = + fmt.Sprintf("SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts,", step) + } + + query := queryTmplPrefix + selectLabels + aggClause + + // for limit query this is the first query, + // we don't the the aggregation value here as we are just concerned with the names of group by + // for applying the limit + if graphLimitQtype == constants.FirstQueryGraphLimit { + query = "SELECT " + logsV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + query + ")" + } + return query, nil +} + +func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) { + filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, v3.AttributeKey{}) + if err != nil { + return "", err + } + + // no values for bucket start and end + resourceSubQuery, err := buildResourceSubQuery(0, 0, mq.Filters, mq.GroupBy, mq.AggregateAttribute, true) + if err != nil { + return "", err + } + // join both the filter clauses + if resourceSubQuery != "" { + filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + } + + // the reader will add the timestamp and id filters + switch mq.AggregateOperator { + case v3.AggregateOperatorNoOp: + query := constants.LogsSQLSelectV2 + "from signoz_logs." + DISTRIBUTED_LOGS_V2 + " where " + if len(filterSubQuery) > 0 { + query = query + filterSubQuery + " AND " + } + + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator in live tail") + } +} + +// PrepareLogsQuery prepares the query for logs +func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options v3.LogQBOptions) (string, error) { + + // adjust the start and end time to the step interval + // NOTE: Disabling this as it's creating confusion between charts and actual data + // if panelType != v3.PanelTypeList { + // start = start - (start % (mq.StepInterval * 1000)) + // end = end - (end % (mq.StepInterval * 1000)) + // } + + if options.IsLivetailQuery { + query, err := buildLogsLiveTailQuery(mq) + if err != nil { + return "", err + } + return query, nil + } else if options.GraphLimitQtype == constants.FirstQueryGraphLimit { + // give me just the group_by names (no values) + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) + if err != nil { + return "", err + } + query = logsV3.AddLimitToQuery(query, mq.Limit) + + return query, nil + } else if options.GraphLimitQtype == constants.SecondQueryGraphLimit { + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) + if err != nil { + return "", err + } + return query, nil + } + + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) + if err != nil { + return "", err + } + if panelType == v3.PanelTypeValue { + query, err = logsV3.ReduceQuery(query, mq.ReduceTo, mq.AggregateOperator) + } + + if panelType == v3.PanelTypeList { + // check if limit exceeded + if mq.Limit > 0 && mq.Offset >= mq.Limit { + return "", fmt.Errorf("max limit exceeded") + } + + if mq.PageSize > 0 { + if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit { + query = logsV3.AddLimitToQuery(query, mq.Limit-mq.Offset) + } else { + query = logsV3.AddLimitToQuery(query, mq.PageSize) + } + + // add offset to the query only if it is not orderd by timestamp. + if !logsV3.IsOrderByTs(mq.OrderBy) { + query = logsV3.AddOffsetToQuery(query, mq.Offset) + } + + } else { + query = logsV3.AddLimitToQuery(query, mq.Limit) + } + } else if panelType == v3.PanelTypeTable { + query = logsV3.AddLimitToQuery(query, mq.Limit) + } + + return query, err +} diff --git a/pkg/query-service/app/logs/v4/query_builder_test.go b/pkg/query-service/app/logs/v4/query_builder_test.go new file mode 100644 index 0000000000..7bc831437c --- /dev/null +++ b/pkg/query-service/app/logs/v4/query_builder_test.go @@ -0,0 +1,1099 @@ +package v4 + +import ( + "testing" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func Test_getClickhouseKey(t *testing.T) { + type args struct { + key v3.AttributeKey + } + tests := []struct { + name string + args args + want string + }{ + { + name: "attribute", + args: args{ + key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + }, + want: "attributes_string['user_name']", + }, + { + name: "resource", + args: args{ + key: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, + }, + want: "resources_string['servicename']", + }, + { + name: "selected field", + args: args{ + key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + }, + want: "`attribute_number_bytes`", + }, + { + name: "selected field resource", + args: args{ + key: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource, IsColumn: true}, + }, + want: "`resource_string_servicename`", + }, + { + name: "top level key", + args: args{ + key: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString}, + }, + want: "trace_id", + }, + { + name: "name with -", + args: args{ + key: v3.AttributeKey{Key: "service-name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + }, + want: "`attribute_string_service-name`", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getClickhouseKey(tt.args.key); got != tt.want { + t.Errorf("getClickhouseKey() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getSelectLabels(t *testing.T) { + type args struct { + aggregatorOperator v3.AggregateOperator + groupBy []v3.AttributeKey + } + tests := []struct { + name string + args args + want string + }{ + { + name: "count", + args: args{ + aggregatorOperator: v3.AggregateOperatorCount, + groupBy: []v3.AttributeKey{{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + }, + want: " attributes_string['user_name'] as `user_name`,", + }, + { + name: "multiple group by", + args: args{ + aggregatorOperator: v3.AggregateOperatorCount, + groupBy: []v3.AttributeKey{ + {Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + {Key: "service_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource, IsColumn: true}, + }, + }, + want: " attributes_string['user_name'] as `user_name`, `resource_string_service_name` as `service_name`,", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getSelectLabels(tt.args.aggregatorOperator, tt.args.groupBy); got != tt.want { + t.Errorf("getSelectLabels() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getExistsNexistsFilter(t *testing.T) { + type args struct { + op v3.FilterOperator + item v3.FilterItem + } + tests := []struct { + name string + args args + want string + }{ + { + name: "exists", + args: args{ + op: v3.FilterOperatorExists, + item: v3.FilterItem{Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + }, + want: "mapContains(attributes_string, 'user_name')", + }, + { + name: "not exists", + args: args{ + op: v3.FilterOperatorNotExists, + item: v3.FilterItem{Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + }, + want: "not mapContains(attributes_string, 'user_name')", + }, + { + name: "exists mat column", + args: args{ + op: v3.FilterOperatorExists, + item: v3.FilterItem{Key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, IsColumn: true}}, + }, + want: "`attribute_number_bytes_exists`=true", + }, + { + name: "exists top level column", + args: args{ + op: v3.FilterOperatorExists, + item: v3.FilterItem{Key: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified}}, + }, + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getExistsNexistsFilter(tt.args.op, tt.args.item); got != tt.want { + t.Errorf("getExistsNexistsFilter() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_buildAttributeFilter(t *testing.T) { + type args struct { + item v3.FilterItem + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "build attribute filter", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + want: "resources_string['service.name'] = 'test'", + wantErr: false, + }, + { + name: "test for value search across all attributes", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "__attrs", + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "test", + }, + }, + want: "has(mapValues(attributes_string), 'test')", + }, + { + name: "build attribute filter exists", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorExists, + }, + }, + want: "mapContains(resources_string, 'service.name')", + wantErr: false, + }, + { + name: "build attribute filter regex", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorRegex, + Value: "^test", + }, + }, + want: "match(resources_string['service.name'], '^test')", + }, + { + name: "build attribute filter contains", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorContains, + Value: "test", + }, + }, + want: "resources_string['service.name'] LIKE '%test%'", + }, + { + name: "build attribute filter contains- body", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + Operator: v3.FilterOperatorContains, + Value: "test", + }, + }, + want: "lower(body) LIKE lower('%test%')", + }, + { + name: "build attribute filter like", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorLike, + Value: "test", + }, + }, + want: "resources_string['service.name'] LIKE 'test'", + }, + { + name: "build attribute filter like-body", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + Operator: v3.FilterOperatorLike, + Value: "test", + }, + }, + want: "lower(body) LIKE lower('test')", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := buildAttributeFilter(tt.args.item) + if (err != nil) != tt.wantErr { + t.Errorf("buildAttributeFilter() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("buildAttributeFilter() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_buildLogsTimeSeriesFilterQuery(t *testing.T) { + type args struct { + fs *v3.FilterSet + groupBy []v3.AttributeKey + aggregateAttribute v3.AttributeKey + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "build logs time series filter query", + args: args{ + fs: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "GET", + }, + }, + }, + }, + want: "attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') " + + "AND attributes_string['method'] = 'GET' AND mapContains(attributes_string, 'method')", + }, + { + name: "build logs time series filter query with group by and aggregate attribute", + args: args{ + fs: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + }, + groupBy: []v3.AttributeKey{ + { + Key: "user_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + aggregateAttribute: v3.AttributeKey{ + Key: "test", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + want: "attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') " + + "AND mapContains(attributes_string, 'user_name') AND mapContains(attributes_string, 'test')", + }, + { + name: "build logs time series filter query with multiple group by and aggregate attribute", + args: args{ + fs: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + }, + groupBy: []v3.AttributeKey{ + { + Key: "user_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "host", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: true, + }, + { + Key: "trace_id", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + }, + aggregateAttribute: v3.AttributeKey{ + Key: "test", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + want: "attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') " + + "AND mapContains(attributes_string, 'user_name') AND `attribute_string_method_exists`=true AND mapContains(attributes_string, 'test')", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := buildLogsTimeSeriesFilterQuery(tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute) + if (err != nil) != tt.wantErr { + t.Errorf("buildLogsTimeSeriesFilterQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("buildLogsTimeSeriesFilterQuery() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_orderByAttributeKeyTags(t *testing.T) { + type args struct { + panelType v3.PanelType + items []v3.OrderBy + tags []v3.AttributeKey + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Test 1", + args: args{ + panelType: v3.PanelTypeGraph, + items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: constants.SigNozOrderByValue, + Order: "desc", + }, + }, + tags: []v3.AttributeKey{ + {Key: "name"}, + }, + }, + want: "`name` asc,value desc", + }, + { + name: "Test Graph item not present in tag", + args: args{ + panelType: v3.PanelTypeGraph, + items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: "bytes", + Order: "asc", + }, + { + ColumnName: "method", + Order: "asc", + }, + }, + tags: []v3.AttributeKey{ + {Key: "name"}, + {Key: "bytes"}, + }, + }, + want: "`name` asc,`bytes` asc", + }, + { + name: "Test panel list", + args: args{ + panelType: v3.PanelTypeList, + items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: constants.SigNozOrderByValue, + Order: "asc", + }, + { + ColumnName: "bytes", + Order: "asc", + }, + }, + tags: []v3.AttributeKey{ + {Key: "name"}, + {Key: "bytes"}, + }, + }, + want: "`name` asc,value asc,`bytes` asc", + }, + { + name: "test 4", + args: args{ + panelType: v3.PanelTypeList, + items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: constants.SigNozOrderByValue, + Order: "asc", + }, + { + ColumnName: "response_time", + Order: "desc", + Key: "response_time", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + }, + tags: []v3.AttributeKey{ + {Key: "name"}, + {Key: "value"}, + }, + }, + want: "`name` asc,value asc,attributes_string['response_time'] desc", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := orderByAttributeKeyTags(tt.args.panelType, tt.args.items, tt.args.tags); got != tt.want { + t.Errorf("orderByAttributeKeyTags() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_generateAggregateClause(t *testing.T) { + type args struct { + op v3.AggregateOperator + aggKey string + step int64 + preferRPM bool + timeFilter string + whereClause string + groupBy string + having string + orderBy string + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "test rate", + args: args{ + op: v3.AggregateOperatorRate, + aggKey: "test", + step: 60, + preferRPM: false, + timeFilter: "(timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458)", + whereClause: " AND attributes_string['service.name'] = 'test'", + groupBy: " group by `user_name`", + having: "", + orderBy: " order by `user_name` desc", + }, + want: " count(test)/60.000000 as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND " + + "(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_string['service.name'] = 'test' " + + "group by `user_name` order by `user_name` desc", + }, + { + name: "test P10 with all args", + args: args{ + op: v3.AggregateOperatorRate, + aggKey: "test", + step: 60, + preferRPM: false, + timeFilter: "(timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458)", + whereClause: " AND attributes_string['service.name'] = 'test'", + groupBy: " group by `user_name`", + having: " having value > 10", + orderBy: " order by `user_name` desc", + }, + want: " count(test)/60.000000 as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND " + + "(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_string['service.name'] = 'test' group by `user_name` having value > 10 order by " + + "`user_name` desc", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := generateAggregateClause(tt.args.op, tt.args.aggKey, tt.args.step, tt.args.preferRPM, tt.args.timeFilter, tt.args.whereClause, tt.args.groupBy, tt.args.having, tt.args.orderBy) + if (err != nil) != tt.wantErr { + t.Errorf("generateAggreagteClause() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("generateAggreagteClause() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_buildLogsQuery(t *testing.T) { + type args struct { + panelType v3.PanelType + start int64 + end int64 + step int64 + mq *v3.BuilderQuery + graphLimitQtype string + preferRPM bool + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "build logs query", + args: args{ + panelType: v3.PanelTypeTable, + start: 1680066360726210000, + end: 1680066458000000000, + step: 1000, + mq: &v3.BuilderQuery{ + AggregateOperator: v3.AggregateOperatorCount, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "user_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "user_name", + Order: "desc", + }, + }, + }, + }, + want: "SELECT attributes_string['user_name'] as `user_name`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "AND attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') AND mapContains(attributes_string, 'user_name') " + + "group by `user_name` order by `user_name` desc", + }, + { + name: "build logs query noop", + args: args{ + panelType: v3.PanelTypeList, + start: 1680066360726210000, + end: 1680066458000000000, + step: 1000, + mq: &v3.BuilderQuery{ + AggregateOperator: v3.AggregateOperatorNoOp, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "desc", + }, + }, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string " + + "from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "AND attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') order by timestamp desc", + }, + { + name: "build logs query with all args", + args: args{ + panelType: v3.PanelTypeGraph, + start: 1680066360726210000, + end: 1680066458000000000, + step: 60, + mq: &v3.BuilderQuery{ + AggregateOperator: v3.AggregateOperatorAvg, + AggregateAttribute: v3.AttributeKey{ + Key: "duration", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + { + Key: v3.AttributeKey{ + Key: "duration", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorGreaterThan, + Value: 1000, + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "host", + Order: "desc", + }, + }, + }, + }, + want: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, resources_string['host'] as `host`, avg(attributes_number['duration']) as value " + + "from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "AND attributes_number['duration'] > 1000.000000 AND mapContains(attributes_number, 'duration') AND mapContains(attributes_number, 'duration') AND " + + "(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) " + + "AND simpleJSONExtractString(labels, 'service.name') = 'test' AND labels like '%service.name%test%' AND ( (simpleJSONHas(labels, 'host') AND labels like '%host%') ))) " + + "group by `host`,ts order by `host` desc", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := buildLogsQuery(tt.args.panelType, tt.args.start, tt.args.end, tt.args.step, tt.args.mq, tt.args.graphLimitQtype, tt.args.preferRPM) + if (err != nil) != tt.wantErr { + t.Errorf("buildLogsQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("buildLogsQuery() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestPrepareLogsQuery(t *testing.T) { + type args struct { + start int64 + end int64 + queryType v3.QueryType + panelType v3.PanelType + mq *v3.BuilderQuery + options v3.LogQBOptions + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "TABLE: Test count with JSON Filter Array, groupBy, orderBy", + args: args{ + start: 1680066360726210000, + end: 1680066458000000000, + panelType: v3.PanelTypeTable, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "body.requestor_list[*]", + DataType: "array(string)", + IsJSON: true, + }, + Operator: "has", + Value: "index_service", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + {Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, + }, + OrderBy: []v3.OrderBy{ + {ColumnName: "name", Order: "DESC"}, + }, + }, + }, + want: "SELECT attributes_string['name'] as `name`, resources_string['host'] as `host`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where " + + "(timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND lower(body) like lower('%requestor_list%') " + + "AND lower(body) like lower('%index_service%') AND has(JSONExtract(JSON_QUERY(body, '$.\"requestor_list\"[*]'), 'Array(String)'), 'index_service') AND mapContains(attributes_string, 'name') AND " + + "(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) AND " + + "( (simpleJSONHas(labels, 'host') AND labels like '%host%') ))) group by `name`,`host` order by `name` DESC", + }, + { + name: "Test TS with limit- first", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeGraph, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateAttribute: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + {Key: v3.AttributeKey{Key: "service.name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "app", Operator: "="}, + }, + }, + Limit: 10, + GroupBy: []v3.AttributeKey{{Key: "user", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + }, + options: v3.LogQBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, + }, + want: "SELECT `user` from (SELECT attributes_string['user'] as `user`, toFloat64(count(distinct(attributes_string['name']))) as value from signoz_logs.distributed_logs_v2 " + + "where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_string['method'] = 'GET' " + + "AND mapContains(attributes_string, 'method') AND mapContains(attributes_string, 'user') AND mapContains(attributes_string, 'name') AND (resource_fingerprint GLOBAL IN " + + "(SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) AND simpleJSONExtractString(labels, 'service.name') = 'app' " + + "AND labels like '%service.name%app%')) group by `user` order by value DESC) LIMIT 10", + }, + { + name: "Test TS with limit- second", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeGraph, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateAttribute: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + {Key: v3.AttributeKey{Key: "service.name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "app", Operator: "="}, + }, + }, + GroupBy: []v3.AttributeKey{{Key: "user", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + Limit: 2, + }, + options: v3.LogQBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit}, + }, + want: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_string['user'] as `user`, toFloat64(count(distinct(attributes_string['name']))) as value " + + "from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND " + + "attributes_string['method'] = 'GET' AND mapContains(attributes_string, 'method') AND mapContains(attributes_string, 'user') AND mapContains(attributes_string, 'name') AND " + + "(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) AND " + + "simpleJSONExtractString(labels, 'service.name') = 'app' AND labels like '%service.name%app%')) AND (`user`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `user`,ts order by value DESC", + }, + { + name: "Live Tail Query", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + }, + }, + }, + options: v3.LogQBOptions{IsLivetailQuery: true}, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string " + + "from signoz_logs.distributed_logs_v2 where attributes_string['method'] = 'GET' AND mapContains(attributes_string, 'method') AND ", + }, + { + name: "Live Tail Query with resource attribute", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + {Key: v3.AttributeKey{Key: "service.name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "app", Operator: "contains"}, + }, + }, + }, + options: v3.LogQBOptions{IsLivetailQuery: true}, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where attributes_string['method'] = 'GET' AND mapContains(attributes_string, 'method') AND " + + "(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE simpleJSONExtractString(labels, 'service.name') LIKE '%app%' AND labels like '%service.name%app%' AND ", + }, + { + name: "Live Tail Query W/O filter", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + }, + options: v3.LogQBOptions{IsLivetailQuery: true}, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string " + + "from signoz_logs.distributed_logs_v2 where ", + }, + { + name: "Table query with limit", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeTable, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + Limit: 10, + }, + }, + want: "SELECT toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) order by value DESC LIMIT 10", + }, + { + name: "Test limit less than pageSize - order by ts", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + OrderBy: []v3.OrderBy{{ColumnName: constants.TIMESTAMP, Order: "desc", Key: constants.TIMESTAMP, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}}, + Limit: 1, + Offset: 0, + PageSize: 5, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "order by `timestamp` desc LIMIT 1", + }, + { + name: "Test limit greater than pageSize - order by ts", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "id", Type: v3.AttributeKeyTypeUnspecified, DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Operator: v3.FilterOperatorLessThan, Value: "2TNh4vp2TpiWyLt3SzuadLJF2s4"}, + }}, + OrderBy: []v3.OrderBy{{ColumnName: constants.TIMESTAMP, Order: "desc", Key: constants.TIMESTAMP, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}}, + Limit: 100, + Offset: 10, + PageSize: 10, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "AND id < '2TNh4vp2TpiWyLt3SzuadLJF2s4' order by `timestamp` desc LIMIT 10", + }, + { + name: "Test limit less than pageSize - order by custom", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "desc", Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + Limit: 1, + Offset: 0, + PageSize: 5, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "order by attributes_string['method'] desc LIMIT 1 OFFSET 0", + }, + { + name: "Test limit greater than pageSize - order by custom", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "id", Type: v3.AttributeKeyTypeUnspecified, DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Operator: v3.FilterOperatorLessThan, Value: "2TNh4vp2TpiWyLt3SzuadLJF2s4"}, + }}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "desc", Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + Limit: 100, + Offset: 50, + PageSize: 50, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND " + + "id < '2TNh4vp2TpiWyLt3SzuadLJF2s4' order by attributes_string['method'] desc LIMIT 50 OFFSET 50", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := PrepareLogsQuery(tt.args.start, tt.args.end, tt.args.queryType, tt.args.panelType, tt.args.mq, tt.args.options) + if (err != nil) != tt.wantErr { + t.Errorf("PrepareLogsQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("PrepareLogsQuery() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/query-service/app/logs/v4/resource_query_builder.go b/pkg/query-service/app/logs/v4/resource_query_builder.go index 004c9269fb..12d6c1a36a 100644 --- a/pkg/query-service/app/logs/v4/resource_query_builder.go +++ b/pkg/query-service/app/logs/v4/resource_query_builder.go @@ -164,7 +164,7 @@ func buildResourceFiltersFromAggregateAttribute(aggregateAttribute v3.AttributeK return "" } -func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey) (string, error) { +func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey, isLiveTail bool) (string, error) { // BUILD THE WHERE CLAUSE var conditions []string @@ -193,9 +193,14 @@ func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, group conditionStr := strings.Join(conditions, " AND ") // BUILD THE FINAL QUERY - query := fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE (seen_at_ts_bucket_start >= %d) AND (seen_at_ts_bucket_start <= %d) AND ", DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd) - - query = "(" + query + conditionStr + ")" + var query string + if isLiveTail { + query = fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE ", DISTRIBUTED_LOGS_V2_RESOURCE) + query = "(" + query + conditionStr + } else { + query = fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE (seen_at_ts_bucket_start >= %d) AND (seen_at_ts_bucket_start <= %d) AND ", DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd) + query = "(" + query + conditionStr + ")" + } return query, nil } diff --git a/pkg/query-service/app/logs/v4/resource_query_builder_test.go b/pkg/query-service/app/logs/v4/resource_query_builder_test.go index 1616c29e08..130fd9e98c 100644 --- a/pkg/query-service/app/logs/v4/resource_query_builder_test.go +++ b/pkg/query-service/app/logs/v4/resource_query_builder_test.go @@ -469,7 +469,7 @@ func Test_buildResourceSubQuery(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := buildResourceSubQuery(tt.args.bucketStart, tt.args.bucketEnd, tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute) + got, err := buildResourceSubQuery(tt.args.bucketStart, tt.args.bucketEnd, tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute, false) if (err != nil) != tt.wantErr { t.Errorf("buildResourceSubQuery() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 70eda959dc..71a1e39032 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -316,6 +316,12 @@ const ( "CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64," + "CAST((attributes_bool_key, attributes_bool_value), 'Map(String, Bool)') as attributes_bool," + "CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string " + LogsSQLSelectV2 = "SELECT " + + "timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, " + + "attributes_string, " + + "attributes_number, " + + "attributes_bool, " + + "resources_string " TracesExplorerViewSQLSelectWithSubQuery = "WITH subQuery AS (SELECT distinct on (traceID) traceID, durationNano, " + "serviceName, name FROM %s.%s WHERE parentSpanID = '' AND %s %s ORDER BY durationNano DESC " TracesExplorerViewSQLSelectQuery = "SELECT subQuery.serviceName, subQuery.name, count() AS " + @@ -380,6 +386,12 @@ var StaticFieldsLogsV3 = map[string]v3.AttributeKey{ Type: v3.AttributeKeyTypeUnspecified, IsColumn: true, }, + "__attrs": { + Key: "__attrs", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeUnspecified, + IsColumn: true, + }, } const SigNozOrderByValue = "#SIGNOZ_VALUE" diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 0128536ac2..c21d47229c 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -1290,3 +1290,9 @@ type URLShareableOptions struct { Format string `json:"format"` SelectColumns []AttributeKey `json:"selectColumns"` } + +type LogQBOptions struct { + GraphLimitQtype string + IsLivetailQuery bool + PreferRPM bool +} diff --git a/pkg/query-service/utils/format.go b/pkg/query-service/utils/format.go index c623d3e8e0..e9b7a0b7e3 100644 --- a/pkg/query-service/utils/format.go +++ b/pkg/query-service/utils/format.go @@ -272,6 +272,28 @@ func GetClickhouseColumnName(typeName string, dataType, field string) string { return colName } +func GetClickhouseColumnNameV2(typeName string, dataType, field string) string { + if typeName == string(v3.AttributeKeyTypeTag) { + typeName = constants.Attributes + } + + if typeName != string(v3.AttributeKeyTypeResource) { + typeName = typeName[:len(typeName)-1] + } + + dataType = strings.ToLower(dataType) + + if dataType == "int64" || dataType == "float64" { + dataType = "number" + } + + // if name contains . replace it with `$$` + field = strings.ReplaceAll(field, ".", "$$") + + colName := fmt.Sprintf("%s_%s_%s", strings.ToLower(typeName), dataType, field) + return colName +} + // GetEpochNanoSecs takes epoch and returns it in ns func GetEpochNanoSecs(epoch int64) int64 { temp := epoch