Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: v4 logs query builder #5796

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions pkg/query-service/app/logs/v3/json_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
NGRAM_SIZE = 4
)

var dataTypeMapping = map[string]string{
var DataTypeMapping = map[string]string{
"string": STRING,
"int64": INT64,
"float64": FLOAT64,
Expand All @@ -31,7 +31,7 @@ var dataTypeMapping = map[string]string{
"array(bool)": ARRAY_BOOL,
}

var arrayValueTypeMapping = map[string]string{
var ArrayValueTypeMapping = map[string]string{
"array(string)": "string",
"array(int64)": "int64",
"array(float64)": "float64",
Expand Down Expand Up @@ -59,7 +59,7 @@ var jsonLogOperators = map[v3.FilterOperator]string{
v3.FilterOperatorNotHas: "NOT has(%s, %s)",
}

func getPath(keyArr []string) string {
func GetPath(keyArr []string) string {
path := []string{}
for i := 0; i < len(keyArr); i++ {
if strings.HasSuffix(keyArr[i], "[*]") {
Expand All @@ -71,7 +71,7 @@ func getPath(keyArr []string) string {
return strings.Join(path, ".")
}

func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (string, error) {
func GetJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (string, error) {
keyArr := strings.Split(key.Key, ".")
// i.e it should be at least body.name, and not something like body
if len(keyArr) < 2 {
Expand All @@ -89,11 +89,11 @@ func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (

var dataType string
var ok bool
if dataType, ok = dataTypeMapping[string(key.DataType)]; !ok {
if dataType, ok = DataTypeMapping[string(key.DataType)]; !ok {
return "", fmt.Errorf("unsupported dataType for JSON: %s", key.DataType)
}

path := getPath(keyArr[1:])
path := GetPath(keyArr[1:])

if isArray {
return fmt.Sprintf("JSONExtract(JSON_QUERY(%s, '$.%s'), '%s')", keyArr[0], path, dataType), nil
Expand All @@ -109,7 +109,7 @@ func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (
}

// takes the path and the values and generates where clauses for better usage of index
func getPathIndexFilter(path string) string {
func GetPathIndexFilter(path string) string {
filters := []string{}
keyArr := strings.Split(path, ".")
if len(keyArr) < 2 {
Expand All @@ -136,15 +136,15 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
dataType := item.Key.DataType
isArray := false
// check if its an array and handle it
if val, ok := arrayValueTypeMapping[string(item.Key.DataType)]; ok {
if val, ok := ArrayValueTypeMapping[string(item.Key.DataType)]; ok {
if item.Operator != v3.FilterOperatorHas && item.Operator != v3.FilterOperatorNotHas {
return "", fmt.Errorf("only has operator is supported for array")
}
isArray = true
dataType = v3.AttributeKeyDataType(val)
}

key, err := getJSONFilterKey(item.Key, item.Operator, isArray)
key, err := GetJSONFilterKey(item.Key, item.Operator, isArray)
if err != nil {
return "", err
}
Expand All @@ -164,7 +164,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
if logsOp, ok := jsonLogOperators[op]; ok {
switch op {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
filter = fmt.Sprintf(logsOp, key, getPath(strings.Split(item.Key.Key, ".")[1:]))
filter = fmt.Sprintf(logsOp, key, GetPath(strings.Split(item.Key.Key, ".")[1:]))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas:
fmtVal := utils.ClickHouseFormattedValue(value)
filter = fmt.Sprintf(logsOp, key, fmtVal)
Expand All @@ -181,7 +181,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {

filters := []string{}

pathFilter := getPathIndexFilter(item.Key.Key)
pathFilter := GetPathIndexFilter(item.Key.Key)
if pathFilter != "" {
filters = append(filters, pathFilter)
}
Expand All @@ -196,7 +196,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {

// add exists check for non array items as default values of int/float/bool will corrupt the results
if !isArray && !(item.Operator == v3.FilterOperatorExists || item.Operator == v3.FilterOperatorNotExists) {
existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", getPath(strings.Split(item.Key.Key, ".")[1:]))
existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", GetPath(strings.Split(item.Key.Key, ".")[1:]))
filter = fmt.Sprintf("%s AND %s", existsFilter, filter)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/query-service/app/logs/v3/json_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ var testGetJSONFilterKeyData = []struct {
func TestGetJSONFilterKey(t *testing.T) {
for _, tt := range testGetJSONFilterKeyData {
Convey("testgetKey", t, func() {
columnName, err := getJSONFilterKey(tt.Key, tt.Operator, tt.IsArray)
columnName, err := GetJSONFilterKey(tt.Key, tt.Operator, tt.IsArray)
if tt.Error {
So(err, ShouldNotBeNil)
} else {
Expand Down
56 changes: 28 additions & 28 deletions pkg/query-service/app/logs/v3/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/utils"
)

var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
var AggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
v3.AggregateOperatorP05: 0.05,
v3.AggregateOperatorP10: 0.10,
v3.AggregateOperatorP20: 0.20,
Expand All @@ -21,7 +21,7 @@ var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
v3.AggregateOperatorP99: 0.99,
}

var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
var AggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
v3.AggregateOperatorAvg: "avg",
v3.AggregateOperatorMax: "max",
v3.AggregateOperatorMin: "min",
Expand Down Expand Up @@ -53,7 +53,7 @@ var logOperators = map[v3.FilterOperator]string{

const BODY = "body"

func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
func GetClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
if columnType == v3.AttributeKeyTypeTag {
return "attributes"
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func getClickhouseColumnName(key v3.AttributeKey) string {
//if the key is present in the topLevelColumn then it will be only searched in those columns,
//regardless if it is indexed/present again in resource or column attribute
if !key.IsColumn {
columnType := getClickhouseLogsColumnType(key.Type)
columnType := GetClickhouseLogsColumnType(key.Type)
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
return clickhouseColumn
Expand Down Expand Up @@ -114,7 +114,7 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri
return selectLabels
}

func getSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
func GetSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
var selectLabels []string
if aggregatorOperator == v3.AggregateOperatorNoOp {
return ""
Expand Down Expand Up @@ -154,7 +154,7 @@ func GetExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string {
}
return fmt.Sprintf("%s_exists`=%v", strings.TrimSuffix(getClickhouseColumnName(item.Key), "`"), val)
}
columnType := getClickhouseLogsColumnType(item.Key.Type)
columnType := GetClickhouseLogsColumnType(item.Key.Type)
columnDataType := getClickhouseLogsColumnDataType(item.Key.DataType)
return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key)
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
// add group by conditions to filter out log lines which doesn't have the key
for _, attr := range groupBy {
if !attr.IsColumn {
columnType := getClickhouseLogsColumnType(attr.Type)
columnType := GetClickhouseLogsColumnType(attr.Type)
columnDataType := getClickhouseLogsColumnDataType(attr.DataType)
conditions = append(conditions, fmt.Sprintf("has(%s_%s_key, '%s')", columnType, columnDataType, attr.Key))
} else if attr.Type != v3.AttributeKeyTypeUnspecified {
Expand Down Expand Up @@ -258,7 +258,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build

selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy)

having := having(mq.Having)
having := Having(mq.Having)
if having != "" {
having = " having " + having
}
Expand Down Expand Up @@ -288,10 +288,10 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
// we dont need value for first query
// going with this route as for a cleaner approach on implementation
if graphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT " + getSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
queryTmpl = "SELECT " + GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
}

groupBy := groupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...)
groupBy := GroupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...)
if panelType != v3.PanelTypeList && groupBy != "" {
groupBy = " group by " + groupBy
}
Expand All @@ -301,7 +301,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
}

if graphLimitQtype == constants.SecondQueryGraphLimit {
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", getSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)"
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)"
}

aggregationKey := ""
Expand Down Expand Up @@ -329,7 +329,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
rate = rate / 60.0
}

op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
op := fmt.Sprintf("%s(%s)/%f", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case
Expand All @@ -342,11 +342,11 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(%s)", aggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
op := fmt.Sprintf("quantile(%v)(%s)", AggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(%s)", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
op := fmt.Sprintf("%s(%s)", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorCount:
Expand Down Expand Up @@ -394,7 +394,7 @@ func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) str
return strings.Join(tags, ",")
}

func groupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string {
func GroupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, "`"+tag.Key+"`")
Expand Down Expand Up @@ -446,7 +446,7 @@ func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []
return str
}

func having(items []v3.Having) string {
func Having(items []v3.Having) string {
// aggregate something and filter on that aggregate
var having []string
for _, item := range items {
Expand All @@ -455,7 +455,7 @@ func having(items []v3.Having) string {
return strings.Join(having, " AND ")
}

func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) {
func ReduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) {
// the timestamp picked is not relevant here since the final value used is show the single
// chart with just the query value.
switch reduceTo {
Expand All @@ -475,14 +475,14 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v
return query, nil
}

func addLimitToQuery(query string, limit uint64) string {
func AddLimitToQuery(query string, limit uint64) string {
if limit == 0 {
return query
}
return fmt.Sprintf("%s LIMIT %d", query, limit)
}

func addOffsetToQuery(query string, offset uint64) string {
func AddOffsetToQuery(query string, offset uint64) string {
return fmt.Sprintf("%s OFFSET %d", query, offset)
}

Expand All @@ -492,7 +492,7 @@ type Options struct {
PreferRPM bool
}

func isOrderByTs(orderBy []v3.OrderBy) bool {
func IsOrderByTs(orderBy []v3.OrderBy) bool {
if len(orderBy) == 1 && (orderBy[0].Key == constants.TIMESTAMP || orderBy[0].ColumnName == constants.TIMESTAMP) {
return true
}
Expand Down Expand Up @@ -523,7 +523,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
if err != nil {
return "", err
}
query = addLimitToQuery(query, mq.Limit)
query = AddLimitToQuery(query, mq.Limit)

return query, nil
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
Expand All @@ -539,7 +539,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
return "", err
}
if panelType == v3.PanelTypeValue {
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
query, err = ReduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
}

if panelType == v3.PanelTypeList {
Expand All @@ -550,21 +550,21 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan

if mq.PageSize > 0 {
if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit {
query = addLimitToQuery(query, mq.Limit-mq.Offset)
query = AddLimitToQuery(query, mq.Limit-mq.Offset)
} else {
query = addLimitToQuery(query, mq.PageSize)
query = AddLimitToQuery(query, mq.PageSize)
}

// add offset to the query only if it is not orderd by timestamp.
if !isOrderByTs(mq.OrderBy) {
query = addOffsetToQuery(query, mq.Offset)
if !IsOrderByTs(mq.OrderBy) {
query = AddOffsetToQuery(query, mq.Offset)
}

} else {
query = addLimitToQuery(query, mq.Limit)
query = AddLimitToQuery(query, mq.Limit)
}
} else if panelType == v3.PanelTypeTable {
query = addLimitToQuery(query, mq.Limit)
query = AddLimitToQuery(query, mq.Limit)
}

return query, err
Expand Down
Loading
Loading