diff --git a/ee/query-service/app/db/metrics.go b/ee/query-service/app/db/metrics.go deleted file mode 100644 index 0cc8a55c32..0000000000 --- a/ee/query-service/app/db/metrics.go +++ /dev/null @@ -1,401 +0,0 @@ -package db - -import ( - "context" - "crypto/md5" - "encoding/json" - "fmt" - "reflect" - "regexp" - "sort" - "strings" - "time" - - "go.signoz.io/signoz/ee/query-service/model" - baseconst "go.signoz.io/signoz/pkg/query-service/constants" - basemodel "go.signoz.io/signoz/pkg/query-service/model" - "go.signoz.io/signoz/pkg/query-service/utils" - "go.uber.org/zap" -) - -// GetMetricResultEE runs the query and returns list of time series -func (r *ClickhouseReader) GetMetricResultEE(ctx context.Context, query string) ([]*basemodel.Series, string, error) { - - defer utils.Elapsed("GetMetricResult", nil)() - zap.L().Info("Executing metric result query: ", zap.String("query", query)) - - var hash string - // If getSubTreeSpans function is used in the clickhouse query - if strings.Contains(query, "getSubTreeSpans(") { - var err error - query, hash, err = r.getSubTreeSpansCustomFunction(ctx, query, hash) - if err == fmt.Errorf("no spans found for the given query") { - return nil, "", nil - } - if err != nil { - return nil, "", err - } - } - - rows, err := r.conn.Query(ctx, query) - if err != nil { - zap.L().Error("Error in processing query", zap.Error(err)) - return nil, "", fmt.Errorf("error in processing query") - } - - var ( - columnTypes = rows.ColumnTypes() - columnNames = rows.Columns() - vars = make([]interface{}, len(columnTypes)) - ) - for i := range columnTypes { - vars[i] = reflect.New(columnTypes[i].ScanType()).Interface() - } - // when group by is applied, each combination of cartesian product - // of attributes is separate series. each item in metricPointsMap - // represent a unique series. - metricPointsMap := make(map[string][]basemodel.MetricPoint) - // attribute key-value pairs for each group selection - attributesMap := make(map[string]map[string]string) - - defer rows.Close() - for rows.Next() { - if err := rows.Scan(vars...); err != nil { - return nil, "", err - } - var groupBy []string - var metricPoint basemodel.MetricPoint - groupAttributes := make(map[string]string) - // Assuming that the end result row contains a timestamp, value and option labels - // Label key and value are both strings. - for idx, v := range vars { - colName := columnNames[idx] - switch v := v.(type) { - case *string: - // special case for returning all labels - if colName == "fullLabels" { - var metric map[string]string - err := json.Unmarshal([]byte(*v), &metric) - if err != nil { - return nil, "", err - } - for key, val := range metric { - groupBy = append(groupBy, val) - groupAttributes[key] = val - } - } else { - groupBy = append(groupBy, *v) - groupAttributes[colName] = *v - } - case *time.Time: - metricPoint.Timestamp = v.UnixMilli() - case *float64: - metricPoint.Value = *v - case **float64: - // ch seems to return this type when column is derived from - // SELECT count(*)/ SELECT count(*) - floatVal := *v - if floatVal != nil { - metricPoint.Value = *floatVal - } - case *float32: - float32Val := float32(*v) - metricPoint.Value = float64(float32Val) - case *uint8, *uint64, *uint16, *uint32: - if _, ok := baseconst.ReservedColumnTargetAliases[colName]; ok { - metricPoint.Value = float64(reflect.ValueOf(v).Elem().Uint()) - } else { - groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())) - groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint()) - } - case *int8, *int16, *int32, *int64: - if _, ok := baseconst.ReservedColumnTargetAliases[colName]; ok { - metricPoint.Value = float64(reflect.ValueOf(v).Elem().Int()) - } else { - groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())) - groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int()) - } - default: - zap.L().Error("invalid var found in metric builder query result", zap.Any("var", v), zap.String("colName", colName)) - } - } - sort.Strings(groupBy) - key := strings.Join(groupBy, "") - attributesMap[key] = groupAttributes - metricPointsMap[key] = append(metricPointsMap[key], metricPoint) - } - - var seriesList []*basemodel.Series - for key := range metricPointsMap { - points := metricPointsMap[key] - // first point in each series could be invalid since the - // aggregations are applied with point from prev series - if len(points) != 0 && len(points) > 1 { - points = points[1:] - } - attributes := attributesMap[key] - series := basemodel.Series{Labels: attributes, Points: points} - seriesList = append(seriesList, &series) - } - // err = r.conn.Exec(ctx, "DROP TEMPORARY TABLE IF EXISTS getSubTreeSpans"+hash) - // if err != nil { - // zap.L().Error("Error in dropping temporary table: ", err) - // return nil, err - // } - if hash == "" { - return seriesList, hash, nil - } else { - return seriesList, "getSubTreeSpans" + hash, nil - } -} - -func (r *ClickhouseReader) getSubTreeSpansCustomFunction(ctx context.Context, query string, hash string) (string, string, error) { - - zap.L().Debug("Executing getSubTreeSpans function") - - // str1 := `select fromUnixTimestamp64Milli(intDiv( toUnixTimestamp64Milli ( timestamp ), 100) * 100) AS interval, toFloat64(count()) as count from (select timestamp, spanId, parentSpanId, durationNano from getSubTreeSpans(select * from signoz_traces.signoz_index_v2 where serviceName='frontend' and name='/driver.DriverService/FindNearest' and traceID='00000000000000004b0a863cb5ed7681') where name='FindDriverIDs' group by interval order by interval asc;` - - // process the query to fetch subTree query - var subtreeInput string - query, subtreeInput, hash = processQuery(query, hash) - - err := r.conn.Exec(ctx, "DROP TABLE IF EXISTS getSubTreeSpans"+hash) - if err != nil { - zap.L().Error("Error in dropping temporary table", zap.Error(err)) - return query, hash, err - } - - // Create temporary table to store the getSubTreeSpans() results - zap.L().Debug("Creating temporary table getSubTreeSpans", zap.String("hash", hash)) - err = r.conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS "+"getSubTreeSpans"+hash+" (timestamp DateTime64(9) CODEC(DoubleDelta, LZ4), traceID FixedString(32) CODEC(ZSTD(1)), spanID String CODEC(ZSTD(1)), parentSpanID String CODEC(ZSTD(1)), rootSpanID String CODEC(ZSTD(1)), serviceName LowCardinality(String) CODEC(ZSTD(1)), name LowCardinality(String) CODEC(ZSTD(1)), rootName LowCardinality(String) CODEC(ZSTD(1)), durationNano UInt64 CODEC(T64, ZSTD(1)), kind Int8 CODEC(T64, ZSTD(1)), tagMap Map(LowCardinality(String), String) CODEC(ZSTD(1)), events Array(String) CODEC(ZSTD(2))) ENGINE = MergeTree() ORDER BY (timestamp)") - if err != nil { - zap.L().Error("Error in creating temporary table", zap.Error(err)) - return query, hash, err - } - - var getSpansSubQueryDBResponses []model.GetSpansSubQueryDBResponse - getSpansSubQuery := subtreeInput - // Execute the subTree query - zap.L().Debug("Executing subTree query", zap.String("query", getSpansSubQuery)) - err = r.conn.Select(ctx, &getSpansSubQueryDBResponses, getSpansSubQuery) - - // zap.L().Info(getSpansSubQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return query, hash, fmt.Errorf("error in processing sql query") - } - - var searchScanResponses []basemodel.SearchSpanDBResponseItem - - // TODO : @ankit: I think the algorithm does not need to assume that subtrees are from the same TraceID. We can take this as an improvement later. - // Fetch all the spans from of same TraceID so that we can build subtree - modelQuery := fmt.Sprintf("SELECT timestamp, traceID, model FROM %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) - - if len(getSpansSubQueryDBResponses) == 0 { - return query, hash, fmt.Errorf("no spans found for the given query") - } - zap.L().Debug("Executing query to fetch all the spans from the same TraceID: ", zap.String("modelQuery", modelQuery)) - err = r.conn.Select(ctx, &searchScanResponses, modelQuery, getSpansSubQueryDBResponses[0].TraceID) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return query, hash, fmt.Errorf("error in processing sql query") - } - - // Process model to fetch the spans - zap.L().Debug("Processing model to fetch the spans") - searchSpanResponses := []basemodel.SearchSpanResponseItem{} - for _, item := range searchScanResponses { - var jsonItem basemodel.SearchSpanResponseItem - json.Unmarshal([]byte(item.Model), &jsonItem) - jsonItem.TimeUnixNano = uint64(item.Timestamp.UnixNano()) - if jsonItem.Events == nil { - jsonItem.Events = []string{} - } - searchSpanResponses = append(searchSpanResponses, jsonItem) - } - // Build the subtree and store all the subtree spans in temporary table getSubTreeSpans+hash - // Use map to store pointer to the spans to avoid duplicates and save memory - zap.L().Debug("Building the subtree to store all the subtree spans in temporary table getSubTreeSpans", zap.String("hash", hash)) - - treeSearchResponse, err := getSubTreeAlgorithm(searchSpanResponses, getSpansSubQueryDBResponses) - if err != nil { - zap.L().Error("Error in getSubTreeAlgorithm function", zap.Error(err)) - return query, hash, err - } - zap.L().Debug("Preparing batch to store subtree spans in temporary table getSubTreeSpans", zap.String("hash", hash)) - statement, err := r.conn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO getSubTreeSpans"+hash)) - if err != nil { - zap.L().Error("Error in preparing batch statement", zap.Error(err)) - return query, hash, err - } - for _, span := range treeSearchResponse { - var parentID string - if len(span.References) > 0 && span.References[0].RefType == "CHILD_OF" { - parentID = span.References[0].SpanId - } - err = statement.Append( - time.Unix(0, int64(span.TimeUnixNano)), - span.TraceID, - span.SpanID, - parentID, - span.RootSpanID, - span.ServiceName, - span.Name, - span.RootName, - uint64(span.DurationNano), - int8(span.Kind), - span.TagMap, - span.Events, - ) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return query, hash, err - } - } - zap.L().Debug("Inserting the subtree spans in temporary table getSubTreeSpans", zap.String("hash", hash)) - err = statement.Send() - if err != nil { - zap.L().Error("Error in sending statement", zap.Error(err)) - return query, hash, err - } - return query, hash, nil -} - -//lint:ignore SA4009 return hash is feeded to the query -func processQuery(query string, hash string) (string, string, string) { - re3 := regexp.MustCompile(`getSubTreeSpans`) - - submatchall3 := re3.FindAllStringIndex(query, -1) - getSubtreeSpansMatchIndex := submatchall3[0][1] - - query2countParenthesis := query[getSubtreeSpansMatchIndex:] - - sqlCompleteIndex := 0 - countParenthesisImbalance := 0 - for i, char := range query2countParenthesis { - - if string(char) == "(" { - countParenthesisImbalance += 1 - } - if string(char) == ")" { - countParenthesisImbalance -= 1 - } - if countParenthesisImbalance == 0 { - sqlCompleteIndex = i - break - } - } - subtreeInput := query2countParenthesis[1:sqlCompleteIndex] - - // hash the subtreeInput - hmd5 := md5.Sum([]byte(subtreeInput)) - hash = fmt.Sprintf("%x", hmd5) - - // Reformat the query to use the getSubTreeSpans function - query = query[:getSubtreeSpansMatchIndex] + hash + " " + query2countParenthesis[sqlCompleteIndex+1:] - return query, subtreeInput, hash -} - -// getSubTreeAlgorithm is an algorithm to build the subtrees of the spans and return the list of spans -func getSubTreeAlgorithm(payload []basemodel.SearchSpanResponseItem, getSpansSubQueryDBResponses []model.GetSpansSubQueryDBResponse) (map[string]*basemodel.SearchSpanResponseItem, error) { - - var spans []*model.SpanForTraceDetails - for _, spanItem := range payload { - var parentID string - if len(spanItem.References) > 0 && spanItem.References[0].RefType == "CHILD_OF" { - parentID = spanItem.References[0].SpanId - } - span := &model.SpanForTraceDetails{ - TimeUnixNano: spanItem.TimeUnixNano, - SpanID: spanItem.SpanID, - TraceID: spanItem.TraceID, - ServiceName: spanItem.ServiceName, - Name: spanItem.Name, - Kind: spanItem.Kind, - DurationNano: spanItem.DurationNano, - TagMap: spanItem.TagMap, - ParentID: parentID, - Events: spanItem.Events, - HasError: spanItem.HasError, - } - spans = append(spans, span) - } - - zap.L().Debug("Building Tree") - roots, err := buildSpanTrees(&spans) - if err != nil { - return nil, err - } - searchSpansResult := make(map[string]*basemodel.SearchSpanResponseItem) - // Every span which was fetched from getSubTree Input SQL query is considered root - // For each root, get the subtree spans - for _, getSpansSubQueryDBResponse := range getSpansSubQueryDBResponses { - targetSpan := &model.SpanForTraceDetails{} - // zap.L().Debug("Building tree for span id: " + getSpansSubQueryDBResponse.SpanID + " " + strconv.Itoa(i+1) + " of " + strconv.Itoa(len(getSpansSubQueryDBResponses))) - // Search target span object in the tree - for _, root := range roots { - targetSpan, err = breadthFirstSearch(root, getSpansSubQueryDBResponse.SpanID) - if targetSpan != nil { - break - } - if err != nil { - zap.L().Error("Error during BreadthFirstSearch()", zap.Error(err)) - return nil, err - } - } - if targetSpan == nil { - return nil, nil - } - // Build subtree for the target span - // Mark the target span as root by setting parent ID as empty string - targetSpan.ParentID = "" - preParents := []*model.SpanForTraceDetails{targetSpan} - children := []*model.SpanForTraceDetails{} - - // Get the subtree child spans - for i := 0; len(preParents) != 0; i++ { - parents := []*model.SpanForTraceDetails{} - for _, parent := range preParents { - children = append(children, parent.Children...) - parents = append(parents, parent.Children...) - } - preParents = parents - } - - resultSpans := children - // Add the target span to the result spans - resultSpans = append(resultSpans, targetSpan) - - for _, item := range resultSpans { - references := []basemodel.OtelSpanRef{ - { - TraceId: item.TraceID, - SpanId: item.ParentID, - RefType: "CHILD_OF", - }, - } - - if item.Events == nil { - item.Events = []string{} - } - searchSpansResult[item.SpanID] = &basemodel.SearchSpanResponseItem{ - TimeUnixNano: item.TimeUnixNano, - SpanID: item.SpanID, - TraceID: item.TraceID, - ServiceName: item.ServiceName, - Name: item.Name, - Kind: item.Kind, - References: references, - DurationNano: item.DurationNano, - TagMap: item.TagMap, - Events: item.Events, - HasError: item.HasError, - RootSpanID: getSpansSubQueryDBResponse.SpanID, - RootName: targetSpan.Name, - } - } - } - return searchSpansResult, nil -} diff --git a/pkg/query-service/app/clickhouseReader/query_progress/inmemory_tracker.go b/pkg/query-service/app/clickhouseReader/query_progress/inmemory_tracker.go index d29a61cb5c..fcefe4cd7c 100644 --- a/pkg/query-service/app/clickhouseReader/query_progress/inmemory_tracker.go +++ b/pkg/query-service/app/clickhouseReader/query_progress/inmemory_tracker.go @@ -7,7 +7,6 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/google/uuid" "go.signoz.io/signoz/pkg/query-service/model" - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -52,7 +51,7 @@ func (tracker *inMemoryQueryProgressTracker) ReportQueryProgress( func (tracker *inMemoryQueryProgressTracker) SubscribeToQueryProgress( queryId string, -) (<-chan v3.QueryProgress, func(), *model.ApiError) { +) (<-chan model.QueryProgress, func(), *model.ApiError) { queryTracker, err := tracker.getQueryTracker(queryId) if err != nil { return nil, nil, err @@ -97,7 +96,7 @@ type queryTracker struct { queryId string isFinished bool - progress *v3.QueryProgress + progress *model.QueryProgress subscriptions map[string]*queryProgressSubscription lock sync.Mutex @@ -124,7 +123,7 @@ func (qt *queryTracker) handleProgressUpdate(p *clickhouse.Progress) { if qt.progress == nil { // This is the first update - qt.progress = &v3.QueryProgress{} + qt.progress = &model.QueryProgress{} } updateQueryProgress(qt.progress, p) @@ -135,7 +134,7 @@ func (qt *queryTracker) handleProgressUpdate(p *clickhouse.Progress) { } func (qt *queryTracker) subscribe() ( - <-chan v3.QueryProgress, func(), *model.ApiError, + <-chan model.QueryProgress, func(), *model.ApiError, ) { qt.lock.Lock() defer qt.lock.Unlock() @@ -200,20 +199,20 @@ func (qt *queryTracker) onFinished() { } type queryProgressSubscription struct { - ch chan v3.QueryProgress + ch chan model.QueryProgress isClosed bool lock sync.Mutex } func newQueryProgressSubscription() *queryProgressSubscription { - ch := make(chan v3.QueryProgress, 1000) + ch := make(chan model.QueryProgress, 1000) return &queryProgressSubscription{ ch: ch, } } // Must not block or panic in any scenario -func (ch *queryProgressSubscription) send(progress v3.QueryProgress) { +func (ch *queryProgressSubscription) send(progress model.QueryProgress) { ch.lock.Lock() defer ch.lock.Unlock() @@ -248,7 +247,7 @@ func (ch *queryProgressSubscription) close() { } } -func updateQueryProgress(qp *v3.QueryProgress, chProgress *clickhouse.Progress) { +func updateQueryProgress(qp *model.QueryProgress, chProgress *clickhouse.Progress) { qp.ReadRows += chProgress.Rows qp.ReadBytes += chProgress.Bytes qp.ElapsedMs += uint64(chProgress.Elapsed.Milliseconds()) diff --git a/pkg/query-service/app/clickhouseReader/query_progress/tracker.go b/pkg/query-service/app/clickhouseReader/query_progress/tracker.go index d424c99c57..c783dfbd5b 100644 --- a/pkg/query-service/app/clickhouseReader/query_progress/tracker.go +++ b/pkg/query-service/app/clickhouseReader/query_progress/tracker.go @@ -3,7 +3,6 @@ package queryprogress import ( "github.com/ClickHouse/clickhouse-go/v2" "go.signoz.io/signoz/pkg/query-service/model" - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) type QueryProgressTracker interface { @@ -19,7 +18,7 @@ type QueryProgressTracker interface { // The returned channel will produce `QueryProgress` instances representing // the latest state of query progress stats. Also returns a function that // can be called to unsubscribe before the query finishes, if needed. - SubscribeToQueryProgress(queryId string) (ch <-chan v3.QueryProgress, unsubscribe func(), err *model.ApiError) + SubscribeToQueryProgress(queryId string) (ch <-chan model.QueryProgress, unsubscribe func(), err *model.ApiError) } func NewQueryProgressTracker() QueryProgressTracker { diff --git a/pkg/query-service/app/clickhouseReader/query_progress/tracker_test.go b/pkg/query-service/app/clickhouseReader/query_progress/tracker_test.go index 4babe47f82..9a51cd0fe6 100644 --- a/pkg/query-service/app/clickhouseReader/query_progress/tracker_test.go +++ b/pkg/query-service/app/clickhouseReader/query_progress/tracker_test.go @@ -7,7 +7,6 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/query-service/model" - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) func TestQueryProgressTracking(t *testing.T) { @@ -45,7 +44,7 @@ func TestQueryProgressTracking(t *testing.T) { require.NotNil(ch) require.NotNil(unsubscribe) - expectedProgress := v3.QueryProgress{} + expectedProgress := model.QueryProgress{} updateQueryProgress(&expectedProgress, testProgress1) require.Equal(expectedProgress.ReadRows, testProgress1.Rows) select { diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 82100c65cc..58f0683c07 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3050,122 +3050,6 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode } } -func (r *ClickHouseReader) GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error) { - zap.L().Error("GetMetricResultEE is not implemented for opensource version") - return nil, "", fmt.Errorf("GetMetricResultEE is not implemented for opensource version") -} - -// GetMetricResult runs the query and returns list of time series -func (r *ClickHouseReader) GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) { - - defer utils.Elapsed("GetMetricResult", nil)() - - zap.L().Info("Executing metric result query: ", zap.String("query", query)) - - rows, err := r.db.Query(ctx, query) - - if err != nil { - zap.L().Error("Error in processing query", zap.Error(err)) - return nil, err - } - - var ( - columnTypes = rows.ColumnTypes() - columnNames = rows.Columns() - vars = make([]interface{}, len(columnTypes)) - ) - for i := range columnTypes { - vars[i] = reflect.New(columnTypes[i].ScanType()).Interface() - } - // when group by is applied, each combination of cartesian product - // of attributes is separate series. each item in metricPointsMap - // represent a unique series. - metricPointsMap := make(map[string][]model.MetricPoint) - // attribute key-value pairs for each group selection - attributesMap := make(map[string]map[string]string) - - defer rows.Close() - for rows.Next() { - if err := rows.Scan(vars...); err != nil { - return nil, err - } - var groupBy []string - var metricPoint model.MetricPoint - groupAttributes := make(map[string]string) - // Assuming that the end result row contains a timestamp, value and option labels - // Label key and value are both strings. - for idx, v := range vars { - colName := columnNames[idx] - switch v := v.(type) { - case *string: - // special case for returning all labels - if colName == "fullLabels" { - var metric map[string]string - err := json.Unmarshal([]byte(*v), &metric) - if err != nil { - return nil, err - } - for key, val := range metric { - groupBy = append(groupBy, val) - groupAttributes[key] = val - } - } else { - groupBy = append(groupBy, *v) - groupAttributes[colName] = *v - } - case *time.Time: - metricPoint.Timestamp = v.UnixMilli() - case *float64: - metricPoint.Value = *v - case **float64: - // ch seems to return this type when column is derived from - // SELECT count(*)/ SELECT count(*) - floatVal := *v - if floatVal != nil { - metricPoint.Value = *floatVal - } - case *float32: - float32Val := float32(*v) - metricPoint.Value = float64(float32Val) - case *uint8, *uint64, *uint16, *uint32: - if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { - metricPoint.Value = float64(reflect.ValueOf(v).Elem().Uint()) - } else { - groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())) - groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint()) - } - case *int8, *int16, *int32, *int64: - if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { - metricPoint.Value = float64(reflect.ValueOf(v).Elem().Int()) - } else { - groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())) - groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int()) - } - default: - zap.L().Error("invalid var found in metric builder query result", zap.Any("v", v), zap.String("colName", colName)) - } - } - sort.Strings(groupBy) - key := strings.Join(groupBy, "") - attributesMap[key] = groupAttributes - metricPointsMap[key] = append(metricPointsMap[key], metricPoint) - } - - var seriesList []*model.Series - for key := range metricPointsMap { - points := metricPointsMap[key] - // first point in each series could be invalid since the - // aggregations are applied with point from prev series - if len(points) != 0 && len(points) > 1 { - points = points[1:] - } - attributes := attributesMap[key] - series := model.Series{Labels: attributes, Points: points} - seriesList = append(seriesList, &series) - } - return seriesList, nil -} - func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) { var totalSpans uint64 @@ -5370,7 +5254,7 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string return response, nil } -func (r *ClickHouseReader) LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClientV2) { +func (r *ClickHouseReader) LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *model.LogsLiveTailClientV2) { if timestampStart == 0 { timestampStart = uint64(time.Now().UnixNano()) } else { @@ -5423,7 +5307,7 @@ func (r *ClickHouseReader) LiveTailLogsV4(ctx context.Context, query string, tim } } -func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) { +func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *model.LogsLiveTailClient) { if timestampStart == 0 { timestampStart = uint64(time.Now().UnixNano()) } else { @@ -5467,7 +5351,7 @@ func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, tim } } -func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error { +func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHistory []model.RuleStateHistory) error { var statement driver.Batch var err error @@ -5498,11 +5382,11 @@ func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHis return nil } -func (r *ClickHouseReader) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error) { +func (r *ClickHouseReader) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]model.RuleStateHistory, error) { query := fmt.Sprintf("SELECT * FROM %s.%s WHERE rule_id = '%s' AND state_changed = true ORDER BY unix_milli DESC LIMIT 1 BY fingerprint", signozHistoryDBName, ruleStateHistoryTableName, ruleID) - history := []v3.RuleStateHistory{} + history := []model.RuleStateHistory{} err := r.db.Select(ctx, &history, query) if err != nil { return nil, err @@ -5511,7 +5395,7 @@ func (r *ClickHouseReader) GetLastSavedRuleStateHistory(ctx context.Context, rul } func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID( - ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error) { + ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*model.RuleStateTimeline, error) { var conditions []string @@ -5574,7 +5458,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID( query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s ORDER BY unix_milli %s LIMIT %d OFFSET %d", signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order, params.Limit, params.Offset) - history := []v3.RuleStateHistory{} + history := []model.RuleStateHistory{} zap.L().Debug("rule state history query", zap.String("query", query)) err := r.db.Select(ctx, &history, query) if err != nil { @@ -5616,7 +5500,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID( } } - timeline := &v3.RuleStateTimeline{ + timeline := &model.RuleStateTimeline{ Items: history, Total: total, Labels: labelsMap, @@ -5626,7 +5510,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID( } func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID( - ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error) { + ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) { query := fmt.Sprintf(`SELECT fingerprint, any(labels) as labels, @@ -5639,7 +5523,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID( signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End) zap.L().Debug("rule state history top contributors query", zap.String("query", query)) - contributors := []v3.RuleStateHistoryContributor{} + contributors := []model.RuleStateHistoryContributor{} err := r.db.Select(ctx, &contributors, query) if err != nil { zap.L().Error("Error while reading rule state history", zap.Error(err)) @@ -5649,7 +5533,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID( return contributors, nil } -func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error) { +func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.ReleStateItem, error) { tmpl := `WITH firing_events AS ( SELECT @@ -5695,18 +5579,18 @@ ORDER BY firing_time ASC;` zap.L().Debug("overall state transitions query", zap.String("query", query)) - transitions := []v3.RuleStateTransition{} + transitions := []model.RuleStateTransition{} err := r.db.Select(ctx, &transitions, query) if err != nil { return nil, err } - stateItems := []v3.ReleStateItem{} + stateItems := []model.ReleStateItem{} for idx, item := range transitions { start := item.FiringTime end := item.ResolutionTime - stateItems = append(stateItems, v3.ReleStateItem{ + stateItems = append(stateItems, model.ReleStateItem{ State: item.State, Start: start, End: end, @@ -5714,7 +5598,7 @@ ORDER BY firing_time ASC;` if idx < len(transitions)-1 { nextStart := transitions[idx+1].FiringTime if nextStart > end { - stateItems = append(stateItems, v3.ReleStateItem{ + stateItems = append(stateItems, model.ReleStateItem{ State: model.StateInactive, Start: end, End: nextStart, @@ -5736,7 +5620,7 @@ ORDER BY firing_time ASC;` if len(transitions) == 0 { // no transitions found, it is either firing or inactive for whole time range - stateItems = append(stateItems, v3.ReleStateItem{ + stateItems = append(stateItems, model.ReleStateItem{ State: state, Start: params.Start, End: params.End, @@ -5744,7 +5628,7 @@ ORDER BY firing_time ASC;` } else { // there were some transitions, we need to add the last state at the end if state == model.StateInactive { - stateItems = append(stateItems, v3.ReleStateItem{ + stateItems = append(stateItems, model.ReleStateItem{ State: model.StateInactive, Start: transitions[len(transitions)-1].ResolutionTime, End: params.End, @@ -5761,12 +5645,12 @@ ORDER BY firing_time ASC;` if err := r.db.QueryRow(ctx, firingQuery).Scan(&firingTime); err != nil { return nil, err } - stateItems = append(stateItems, v3.ReleStateItem{ + stateItems = append(stateItems, model.ReleStateItem{ State: model.StateInactive, Start: transitions[len(transitions)-1].ResolutionTime, End: firingTime, }) - stateItems = append(stateItems, v3.ReleStateItem{ + stateItems = append(stateItems, model.ReleStateItem{ State: model.StateFiring, Start: firingTime, End: params.End, @@ -5776,7 +5660,7 @@ ORDER BY firing_time ASC;` return stateItems, nil } -func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) { +func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (float64, error) { tmpl := ` WITH firing_events AS ( @@ -5831,7 +5715,7 @@ FROM matched_events; return avgResolutionTime, nil } -func (r *ClickHouseReader) GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) { +func (r *ClickHouseReader) GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*v3.Series, error) { step := common.MinAllowedStepInterval(params.Start, params.End) @@ -5888,7 +5772,7 @@ ORDER BY ts ASC;` return result[0], nil } -func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) { +func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (uint64, error) { query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d", signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End) @@ -5902,7 +5786,7 @@ func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, return totalTriggers, nil } -func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) { +func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*v3.Series, error) { step := common.MinAllowedStepInterval(params.Start, params.End) query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC", @@ -5949,6 +5833,6 @@ func (r *ClickHouseReader) ReportQueryStartForProgressTracking( func (r *ClickHouseReader) SubscribeToQueryProgress( queryId string, -) (<-chan v3.QueryProgress, func(), *model.ApiError) { +) (<-chan model.QueryProgress, func(), *model.ApiError) { return r.queryProgressTracker.SubscribeToQueryProgress(queryId) } diff --git a/pkg/query-service/app/dashboards/model.go b/pkg/query-service/app/dashboards/model.go index a6c5d35c9e..306934c443 100644 --- a/pkg/query-service/app/dashboards/model.go +++ b/pkg/query-service/app/dashboards/model.go @@ -5,16 +5,13 @@ import ( "encoding/base64" "encoding/json" "fmt" - "reflect" "regexp" - "strconv" "strings" "time" "github.com/google/uuid" "github.com/gosimple/slug" "github.com/jmoiron/sqlx" - "github.com/mitchellh/mapstructure" "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" @@ -216,14 +213,6 @@ func CreateDashboard(ctx context.Context, data map[string]interface{}, fm interf return nil, &model.ApiError{Typ: model.ErrorExec, Err: err} } - newCount, _ := countTraceAndLogsPanel(data) - if newCount > 0 { - fErr := checkFeatureUsage(fm, newCount) - if fErr != nil { - return nil, fErr - } - } - result, err := db.Exec("INSERT INTO dashboards (uuid, created_at, created_by, updated_at, updated_by, data) VALUES ($1, $2, $3, $4, $5, $6)", dash.Uuid, dash.CreatedAt, userEmail, dash.UpdatedAt, userEmail, mapData) @@ -237,11 +226,6 @@ func CreateDashboard(ctx context.Context, data map[string]interface{}, fm interf } dash.Id = int(lastInsertId) - traceAndLogsPanelUsage, _ := countTraceAndLogsPanel(data) - if traceAndLogsPanelUsage > 0 { - updateFeatureUsage(fm, traceAndLogsPanelUsage) - } - return dash, nil } @@ -287,11 +271,6 @@ func DeleteDashboard(ctx context.Context, uuid string, fm interfaces.FeatureLook return &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("no dashboard found with uuid: %s", uuid)} } - traceAndLogsPanelUsage, _ := countTraceAndLogsPanel(dashboard.Data) - if traceAndLogsPanelUsage > 0 { - updateFeatureUsage(fm, -traceAndLogsPanelUsage) - } - return nil } @@ -329,28 +308,15 @@ func UpdateDashboard(ctx context.Context, uuid string, data map[string]interface } } - // check if the count of trace and logs QB panel has changed, if yes, then check feature flag count - existingCount, existingTotal := countTraceAndLogsPanel(dashboard.Data) - newCount, newTotal := countTraceAndLogsPanel(data) - if newCount > existingCount { - err := checkFeatureUsage(fm, newCount-existingCount) - if err != nil { - return nil, err - } - } - - if existingTotal > newTotal && existingTotal-newTotal > 1 { - // if the total count of panels has reduced by more than 1, - // return error - existingIds := getWidgetIds(dashboard.Data) - newIds := getWidgetIds(data) - - differenceIds := getIdDifference(existingIds, newIds) + // if the total count of panels has reduced by more than 1, + // return error + existingIds := getWidgetIds(dashboard.Data) + newIds := getWidgetIds(data) - if len(differenceIds) > 1 { - return nil, model.BadRequest(fmt.Errorf("deleting more than one panel is not supported")) - } + differenceIds := getIdDifference(existingIds, newIds) + if len(differenceIds) > 1 { + return nil, model.BadRequest(fmt.Errorf("deleting more than one panel is not supported")) } dashboard.UpdatedAt = time.Now() @@ -364,10 +330,6 @@ func UpdateDashboard(ctx context.Context, uuid string, data map[string]interface zap.L().Error("Error in inserting dashboard data", zap.Any("data", data), zap.Error(err)) return nil, &model.ApiError{Typ: model.ErrorExec, Err: err} } - if existingCount != newCount { - // if the count of trace and logs panel has changed, we need to update feature flag count as well - updateFeatureUsage(fm, newCount-existingCount) - } return dashboard, nil } @@ -389,51 +351,6 @@ func LockUnlockDashboard(ctx context.Context, uuid string, lock bool) *model.Api return nil } -func updateFeatureUsage(fm interfaces.FeatureLookup, usage int64) *model.ApiError { - feature, err := fm.GetFeatureFlag(model.QueryBuilderPanels) - if err != nil { - switch err.(type) { - case model.ErrFeatureUnavailable: - zap.L().Error("feature unavailable", zap.String("featureKey", model.QueryBuilderPanels), zap.Error(err)) - return model.BadRequest(err) - default: - zap.L().Error("feature check failed", zap.String("featureKey", model.QueryBuilderPanels), zap.Error(err)) - return model.BadRequest(err) - } - } - feature.Usage += usage - if feature.Usage >= feature.UsageLimit && feature.UsageLimit != -1 { - feature.Active = false - } - if feature.Usage < feature.UsageLimit || feature.UsageLimit == -1 { - feature.Active = true - } - err = fm.UpdateFeatureFlag(feature) - if err != nil { - return model.BadRequest(err) - } - - return nil -} - -func checkFeatureUsage(fm interfaces.FeatureLookup, usage int64) *model.ApiError { - feature, err := fm.GetFeatureFlag(model.QueryBuilderPanels) - if err != nil { - switch err.(type) { - case model.ErrFeatureUnavailable: - zap.L().Error("feature unavailable", zap.String("featureKey", model.QueryBuilderPanels), zap.Error(err)) - return model.BadRequest(err) - default: - zap.L().Error("feature check failed", zap.String("featureKey", model.QueryBuilderPanels), zap.Error(err)) - return model.BadRequest(err) - } - } - if feature.UsageLimit-(feature.Usage+usage) < 0 && feature.UsageLimit != -1 { - return model.BadRequest(fmt.Errorf("feature usage exceeded")) - } - return nil -} - // UpdateSlug updates the slug func (d *Dashboard) UpdateSlug() { var title string @@ -469,276 +386,6 @@ func SlugifyTitle(title string) string { return s } -func widgetFromPanel(panel model.Panels, idx int, variables map[string]model.Variable) *model.Widget { - widget := model.Widget{ - Description: panel.Description, - ID: strconv.Itoa(idx), - IsStacked: false, - NullZeroValues: "zero", - Opacity: "1", - PanelTypes: "TIME_SERIES", // TODO: Need to figure out how to get this - Query: model.Query{ - ClickHouse: []model.ClickHouseQueryDashboard{ - { - Disabled: false, - Legend: "", - Name: "A", - Query: "", - }, - }, - MetricsBuilder: model.MetricsBuilder{ - Formulas: []string{}, - QueryBuilder: []model.QueryBuilder{ - { - AggregateOperator: 1, - Disabled: false, - GroupBy: []string{}, - Legend: "", - MetricName: "", - Name: "A", - ReduceTo: 1, - }, - }, - }, - PromQL: []model.PromQueryDashboard{}, - QueryType: int(model.PROM), - }, - QueryData: model.QueryDataDashboard{ - Data: model.Data{ - QueryData: []interface{}{}, - }, - }, - Title: panel.Title, - YAxisUnit: panel.FieldConfig.Defaults.Unit, - QueryType: int(model.PROM), // TODO: Supprot for multiple query types - } - for _, target := range panel.Targets { - if target.Expr != "" { - for name := range variables { - target.Expr = strings.ReplaceAll(target.Expr, "$"+name, "{{"+"."+name+"}}") - target.Expr = strings.ReplaceAll(target.Expr, "$"+"__rate_interval", "5m") - } - - // prometheus receiver in collector maps job,instance as service_name,service_instance_id - target.Expr = instanceEQRE.ReplaceAllString(target.Expr, "service_instance_id=\"{{.instance}}\"") - target.Expr = nodeEQRE.ReplaceAllString(target.Expr, "service_instance_id=\"{{.node}}\"") - target.Expr = jobEQRE.ReplaceAllString(target.Expr, "service_name=\"{{.job}}\"") - target.Expr = instanceRERE.ReplaceAllString(target.Expr, "service_instance_id=~\"{{.instance}}\"") - target.Expr = nodeRERE.ReplaceAllString(target.Expr, "service_instance_id=~\"{{.node}}\"") - target.Expr = jobRERE.ReplaceAllString(target.Expr, "service_name=~\"{{.job}}\"") - - widget.Query.PromQL = append( - widget.Query.PromQL, - model.PromQueryDashboard{ - Disabled: false, - Legend: target.LegendFormat, - Name: target.RefID, - Query: target.Expr, - }, - ) - } - } - return &widget -} - -func TransformGrafanaJSONToSignoz(grafanaJSON model.GrafanaJSON) model.DashboardData { - var toReturn model.DashboardData - toReturn.Title = grafanaJSON.Title - toReturn.Tags = grafanaJSON.Tags - toReturn.Variables = make(map[string]model.Variable) - - for templateIdx, template := range grafanaJSON.Templating.List { - var sort, typ, textboxValue, customValue, queryValue string - if template.Sort == 1 { - sort = "ASC" - } else if template.Sort == 2 { - sort = "DESC" - } else { - sort = "DISABLED" - } - - if template.Type == "query" { - if template.Datasource == nil { - zap.L().Warn("Skipping panel as it has no datasource", zap.Int("templateIdx", templateIdx)) - continue - } - // Skip if the source is not prometheus - source, stringOk := template.Datasource.(string) - if stringOk && !strings.Contains(strings.ToLower(source), "prometheus") { - zap.L().Warn("Skipping template as it is not prometheus", zap.Int("templateIdx", templateIdx)) - continue - } - var result model.Datasource - var structOk bool - if reflect.TypeOf(template.Datasource).Kind() == reflect.Map { - err := mapstructure.Decode(template.Datasource, &result) - if err == nil { - structOk = true - } - } - if result.Type != "prometheus" && result.Type != "" { - zap.L().Warn("Skipping template as it is not prometheus", zap.Int("templateIdx", templateIdx)) - continue - } - - if !stringOk && !structOk { - zap.L().Warn("Didn't recognize source, skipping") - continue - } - typ = "QUERY" - } else if template.Type == "custom" { - typ = "CUSTOM" - } else if template.Type == "textbox" { - typ = "TEXTBOX" - text, ok := template.Current.Text.(string) - if ok { - textboxValue = text - } - array, ok := template.Current.Text.([]string) - if ok { - textboxValue = strings.Join(array, ",") - } - } else { - continue - } - - var selectedValue string - text, ok := template.Current.Value.(string) - if ok { - selectedValue = text - } - array, ok := template.Current.Value.([]string) - if ok { - selectedValue = strings.Join(array, ",") - } - - toReturn.Variables[template.Name] = model.Variable{ - AllSelected: false, - CustomValue: customValue, - Description: template.Label, - MultiSelect: template.Multi, - QueryValue: queryValue, - SelectedValue: selectedValue, - ShowALLOption: template.IncludeAll, - Sort: sort, - TextboxValue: textboxValue, - Type: typ, - } - } - - row := 0 - idx := 0 - for _, panel := range grafanaJSON.Panels { - if panel.Type == "row" { - if panel.Panels != nil && len(panel.Panels) > 0 { - for _, innerPanel := range panel.Panels { - if idx%3 == 0 { - row++ - } - toReturn.Layout = append( - toReturn.Layout, - model.Layout{ - X: idx % 3 * 4, - Y: row * 3, - W: 4, - H: 3, - I: strconv.Itoa(idx), - }, - ) - - toReturn.Widgets = append(toReturn.Widgets, *widgetFromPanel(innerPanel, idx, toReturn.Variables)) - idx++ - } - } - continue - } - if panel.Datasource == nil { - zap.L().Warn("Skipping panel as it has no datasource", zap.Int("idx", idx)) - continue - } - // Skip if the datasource is not prometheus - source, stringOk := panel.Datasource.(string) - if stringOk && !strings.Contains(strings.ToLower(source), "prometheus") { - zap.L().Warn("Skipping panel as it is not prometheus", zap.Int("idx", idx)) - continue - } - var result model.Datasource - var structOk bool - if reflect.TypeOf(panel.Datasource).Kind() == reflect.Map { - err := mapstructure.Decode(panel.Datasource, &result) - if err == nil { - structOk = true - } - } - if result.Type != "prometheus" && result.Type != "" { - zap.L().Warn("Skipping panel as it is not prometheus", zap.Int("idx", idx)) - continue - } - - if !stringOk && !structOk { - zap.L().Warn("Didn't recognize source, skipping") - continue - } - - // Create a panel from "gridPos" - - if idx%3 == 0 { - row++ - } - toReturn.Layout = append( - toReturn.Layout, - model.Layout{ - X: idx % 3 * 4, - Y: row * 3, - W: 4, - H: 3, - I: strconv.Itoa(idx), - }, - ) - - toReturn.Widgets = append(toReturn.Widgets, *widgetFromPanel(panel, idx, toReturn.Variables)) - idx++ - } - return toReturn -} - -func countTraceAndLogsPanel(data map[string]interface{}) (int64, int64) { - count := int64(0) - totalPanels := int64(0) - if data != nil && data["widgets"] != nil { - widgets, ok := data["widgets"] - if ok { - data, ok := widgets.([]interface{}) - if ok { - for _, widget := range data { - sData, ok := widget.(map[string]interface{}) - if ok && sData["query"] != nil { - totalPanels++ - query, ok := sData["query"].(map[string]interface{}) - if ok && query["queryType"] == "builder" && query["builder"] != nil { - builderData, ok := query["builder"].(map[string]interface{}) - if ok && builderData["queryData"] != nil { - builderQueryData, ok := builderData["queryData"].([]interface{}) - if ok { - for _, queryData := range builderQueryData { - data, ok := queryData.(map[string]interface{}) - if ok { - if data["dataSource"] == "traces" || data["dataSource"] == "logs" { - count++ - } - } - } - } - } - } - } - } - } - } - } - return count, totalPanels -} - func getWidgetIds(data map[string]interface{}) []string { widgetIds := []string{} if data != nil && data["widgets"] != nil { diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index f7e33a8579..20852f1660 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -396,11 +396,9 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/dashboards", am.ViewAccess(aH.getDashboards)).Methods(http.MethodGet) router.HandleFunc("/api/v1/dashboards", am.EditAccess(aH.createDashboards)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/dashboards/grafana", am.EditAccess(aH.createDashboardsTransform)).Methods(http.MethodPost) router.HandleFunc("/api/v1/dashboards/{uuid}", am.ViewAccess(aH.getDashboard)).Methods(http.MethodGet) router.HandleFunc("/api/v1/dashboards/{uuid}", am.EditAccess(aH.updateDashboard)).Methods(http.MethodPut) router.HandleFunc("/api/v1/dashboards/{uuid}", am.EditAccess(aH.deleteDashboard)).Methods(http.MethodDelete) - router.HandleFunc("/api/v1/variables/query", am.ViewAccess(aH.queryDashboardVars)).Methods(http.MethodGet) router.HandleFunc("/api/v2/variables/query", am.ViewAccess(aH.queryDashboardVarsV2)).Methods(http.MethodPost) router.HandleFunc("/api/v1/explorer/views", am.ViewAccess(aH.getSavedViews)).Methods(http.MethodGet) @@ -671,7 +669,7 @@ func (aH *APIHandler) deleteDowntimeSchedule(w http.ResponseWriter, r *http.Requ func (aH *APIHandler) getRuleStats(w http.ResponseWriter, r *http.Request) { ruleID := mux.Vars(r)["id"] - params := v3.QueryRuleStateHistory{} + params := model.QueryRuleStateHistory{} err := json.NewDecoder(r.Body).Decode(¶ms) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) @@ -737,7 +735,7 @@ func (aH *APIHandler) getRuleStats(w http.ResponseWriter, r *http.Request) { pastAvgResolutionTime = 0 } - stats := v3.Stats{ + stats := model.Stats{ TotalCurrentTriggers: totalCurrentTriggers, TotalPastTriggers: totalPastTriggers, CurrentTriggersSeries: currentTriggersSeries, @@ -753,7 +751,7 @@ func (aH *APIHandler) getRuleStats(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http.Request) { ruleID := mux.Vars(r)["id"] - params := v3.QueryRuleStateHistory{} + params := model.QueryRuleStateHistory{} err := json.NewDecoder(r.Body).Decode(¶ms) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) @@ -771,7 +769,7 @@ func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http. func (aH *APIHandler) getRuleStateHistory(w http.ResponseWriter, r *http.Request) { ruleID := mux.Vars(r)["id"] - params := v3.QueryRuleStateHistory{} + params := model.QueryRuleStateHistory{} err := json.NewDecoder(r.Body).Decode(¶ms) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) @@ -823,7 +821,7 @@ func (aH *APIHandler) getRuleStateHistory(w http.ResponseWriter, r *http.Request func (aH *APIHandler) getRuleStateHistoryTopContributors(w http.ResponseWriter, r *http.Request) { ruleID := mux.Vars(r)["id"] - params := v3.QueryRuleStateHistory{} + params := model.QueryRuleStateHistory{} err := json.NewDecoder(r.Body).Decode(¶ms) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) @@ -941,25 +939,6 @@ func (aH *APIHandler) deleteDashboard(w http.ResponseWriter, r *http.Request) { } -func (aH *APIHandler) queryDashboardVars(w http.ResponseWriter, r *http.Request) { - - query := r.URL.Query().Get("query") - if query == "" { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("query is required")}, nil) - return - } - if strings.Contains(strings.ToLower(query), "alter table") { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("query shouldn't alter data")}, nil) - return - } - dashboardVars, err := aH.reader.QueryDashboardVars(r.Context(), query) - if err != nil { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - aH.Respond(w, dashboardVars) -} - func prepareQuery(r *http.Request) (string, error) { var postData *model.DashboardVars @@ -1089,27 +1068,6 @@ func (aH *APIHandler) saveAndReturn(w http.ResponseWriter, r *http.Request, sign aH.Respond(w, dashboard) } -func (aH *APIHandler) createDashboardsTransform(w http.ResponseWriter, r *http.Request) { - - defer r.Body.Close() - b, err := io.ReadAll(r.Body) - - if err != nil { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading request body") - return - } - - var importData model.GrafanaJSON - - err = json.Unmarshal(b, &importData) - if err == nil { - signozDashboard := dashboards.TransformGrafanaJSONToSignoz(importData) - aH.saveAndReturn(w, r, signozDashboard) - return - } - RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, "Error while creating dashboard from grafana json") -} - func (aH *APIHandler) createDashboards(w http.ResponseWriter, r *http.Request) { var postData map[string]interface{} @@ -3996,7 +3954,7 @@ func (aH *APIHandler) liveTailLogsV2(w http.ResponseWriter, r *http.Request) { flusher.Flush() // create the client - client := &v3.LogsLiveTailClientV2{Name: r.RemoteAddr, Logs: make(chan *model.SignozLogV2, 1000), Done: make(chan *bool), Error: make(chan error)} + client := &model.LogsLiveTailClientV2{Name: r.RemoteAddr, Logs: make(chan *model.SignozLogV2, 1000), Done: make(chan *bool), Error: make(chan error)} go aH.reader.LiveTailLogsV4(r.Context(), queryString, uint64(queryRangeParams.Start), "", client) for { select { @@ -4066,7 +4024,7 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { } // create the client - client := &v3.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error)} + client := &model.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error)} go aH.reader.LiveTailLogsV3(r.Context(), queryString, uint64(queryRangeParams.Start), "", client) w.Header().Set("Connection", "keep-alive") diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 7a4252293f..9cf8857b92 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -58,8 +58,6 @@ type Reader interface { SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) - GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) - GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error) GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) @@ -70,8 +68,8 @@ type Reader interface { // QB V3 metrics/traces/logs GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error) - LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) - LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClientV2) + LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *model.LogsLiveTailClient) + LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *model.LogsLiveTailClientV2) GetDashboardsInfo(ctx context.Context) (*model.DashboardsInfo, error) GetSavedViewsInfo(ctx context.Context) (*model.SavedViewsInfo, error) @@ -109,21 +107,21 @@ type Reader interface { GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error) - AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error - GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error) - ReadRuleStateHistoryByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error) - GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) - GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) - GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) - GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) - ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error) - GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error) + AddRuleStateHistory(ctx context.Context, ruleStateHistory []model.RuleStateHistory) error + GetOverallStateTransitions(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.ReleStateItem, error) + ReadRuleStateHistoryByRuleID(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*model.RuleStateTimeline, error) + GetTotalTriggers(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (uint64, error) + GetTriggersByInterval(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*v3.Series, error) + GetAvgResolutionTime(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (float64, error) + GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) (*v3.Series, error) + ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error) + GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]model.RuleStateHistory, error) GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error) // Query Progress tracking helpers. ReportQueryStartForProgressTracking(queryId string) (reportQueryFinished func(), err *model.ApiError) - SubscribeToQueryProgress(queryId string) (<-chan v3.QueryProgress, func(), *model.ApiError) + SubscribeToQueryProgress(queryId string) (<-chan model.QueryProgress, func(), *model.ApiError) } type Querier interface { diff --git a/pkg/query-service/model/alerting.go b/pkg/query-service/model/alerting.go index 4d54f6ae34..944efecc12 100644 --- a/pkg/query-service/model/alerting.go +++ b/pkg/query-service/model/alerting.go @@ -3,8 +3,10 @@ package model import ( "database/sql/driver" "encoding/json" + "fmt" "github.com/pkg/errors" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) // AlertState denotes the state of an active alert. @@ -88,3 +90,104 @@ func (s *AlertState) Scan(value interface{}) error { func (s *AlertState) Value() (driver.Value, error) { return s.String(), nil } + +type LabelsString string + +func (l *LabelsString) MarshalJSON() ([]byte, error) { + lbls := make(map[string]string) + err := json.Unmarshal([]byte(*l), &lbls) + if err != nil { + return nil, err + } + return json.Marshal(lbls) +} + +func (l *LabelsString) Scan(src interface{}) error { + if data, ok := src.(string); ok { + *l = LabelsString(data) + } + return nil +} + +func (l LabelsString) String() string { + return string(l) +} + +type RuleStateTimeline struct { + Items []RuleStateHistory `json:"items"` + Total uint64 `json:"total"` + Labels map[string][]string `json:"labels"` +} + +type RuleStateHistory struct { + RuleID string `json:"ruleID" ch:"rule_id"` + RuleName string `json:"ruleName" ch:"rule_name"` + // One of ["normal", "firing"] + OverallState AlertState `json:"overallState" ch:"overall_state"` + OverallStateChanged bool `json:"overallStateChanged" ch:"overall_state_changed"` + // One of ["normal", "firing", "no_data", "muted"] + State AlertState `json:"state" ch:"state"` + StateChanged bool `json:"stateChanged" ch:"state_changed"` + UnixMilli int64 `json:"unixMilli" ch:"unix_milli"` + Labels LabelsString `json:"labels" ch:"labels"` + Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"` + Value float64 `json:"value" ch:"value"` + + RelatedTracesLink string `json:"relatedTracesLink"` + RelatedLogsLink string `json:"relatedLogsLink"` +} + +type QueryRuleStateHistory struct { + Start int64 `json:"start"` + End int64 `json:"end"` + State string `json:"state"` + Filters *v3.FilterSet `json:"filters"` + Offset int64 `json:"offset"` + Limit int64 `json:"limit"` + Order string `json:"order"` +} + +func (r *QueryRuleStateHistory) Validate() error { + if r.Start == 0 || r.End == 0 { + return fmt.Errorf("start and end are required") + } + if r.Offset < 0 || r.Limit < 0 { + return fmt.Errorf("offset and limit must be greater than 0") + } + if r.Order != "asc" && r.Order != "desc" { + return fmt.Errorf("order must be asc or desc") + } + return nil +} + +type RuleStateHistoryContributor struct { + Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"` + Labels LabelsString `json:"labels" ch:"labels"` + Count uint64 `json:"count" ch:"count"` + RelatedTracesLink string `json:"relatedTracesLink"` + RelatedLogsLink string `json:"relatedLogsLink"` +} + +type RuleStateTransition struct { + RuleID string `json:"ruleID" ch:"rule_id"` + State AlertState `json:"state" ch:"state"` + FiringTime int64 `json:"firingTime" ch:"firing_time"` + ResolutionTime int64 `json:"resolutionTime" ch:"resolution_time"` +} + +type ReleStateItem struct { + State AlertState `json:"state"` + Start int64 `json:"start"` + End int64 `json:"end"` +} + +type Stats struct { + TotalCurrentTriggers uint64 `json:"totalCurrentTriggers"` + TotalPastTriggers uint64 `json:"totalPastTriggers"` + CurrentTriggersSeries *v3.Series `json:"currentTriggersSeries"` + PastTriggersSeries *v3.Series `json:"pastTriggersSeries"` + CurrentAvgResolutionTime string `json:"currentAvgResolutionTime"` + PastAvgResolutionTime string `json:"pastAvgResolutionTime"` + CurrentAvgResolutionTimeSeries *v3.Series `json:"currentAvgResolutionTimeSeries"` + PastAvgResolutionTimeSeries *v3.Series `json:"pastAvgResolutionTimeSeries"` +} diff --git a/pkg/query-service/model/logs.go b/pkg/query-service/model/logs.go new file mode 100644 index 0000000000..ef1c7ff2e4 --- /dev/null +++ b/pkg/query-service/model/logs.go @@ -0,0 +1,23 @@ +package model + +type LogsLiveTailClientV2 struct { + Name string + Logs chan *SignozLogV2 + Done chan *bool + Error chan error +} + +type LogsLiveTailClient struct { + Name string + Logs chan *SignozLog + Done chan *bool + Error chan error +} + +type QueryProgress struct { + ReadRows uint64 `json:"read_rows"` + + ReadBytes uint64 `json:"read_bytes"` + + ElapsedMs uint64 `json:"elapsed_ms"` +} diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 321f7417be..342f8f10f0 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -18,110 +18,6 @@ type QueryRangeParams struct { Stats string } -type MetricQuery struct { - QueryName string `json:"queryName"` - MetricName string `json:"metricName"` - TagFilters *FilterSet `json:"tagFilters,omitempty"` - GroupingTags []string `json:"groupBy,omitempty"` - AggregateOperator AggregateOperator `json:"aggregateOperator"` - Expression string `json:"expression"` - Disabled bool `json:"disabled"` - ReduceTo ReduceToOperator `json:"reduceTo,omitempty"` -} - -type ReduceToOperator int - -const ( - _ ReduceToOperator = iota - RLAST - RSUM - RAVG - RMAX - RMIN -) - -type QueryType int - -const ( - _ QueryType = iota - QUERY_BUILDER - CLICKHOUSE - PROM -) - -type PromQuery struct { - Query string `json:"query"` - Stats string `json:"stats,omitempty"` - Disabled bool `json:"disabled"` -} - -type ClickHouseQuery struct { - Query string `json:"query"` - Disabled bool `json:"disabled"` -} - -type PanelType int - -const ( - _ PanelType = iota - TIME_SERIES - QUERY_VALUE -) - -type CompositeMetricQuery struct { - BuilderQueries map[string]*MetricQuery `json:"builderQueries,omitempty"` - ClickHouseQueries map[string]*ClickHouseQuery `json:"chQueries,omitempty"` - PromQueries map[string]*PromQuery `json:"promQueries,omitempty"` - PanelType PanelType `json:"panelType"` - QueryType QueryType `json:"queryType"` -} - -type AggregateOperator int - -const ( - _ AggregateOperator = iota - NOOP - COUNT - COUNT_DISTINCT - SUM - AVG - MAX - MIN - P05 - P10 - P20 - P25 - P50 - P75 - P90 - P95 - P99 - RATE - SUM_RATE - // leave blank space for possily {AVG, X}_RATE - _ - _ - _ - RATE_SUM - RATE_AVG - RATE_MAX - RATE_MIN - HIST_QUANTILE_50 - HIST_QUANTILE_75 - HIST_QUANTILE_90 - HIST_QUANTILE_95 - HIST_QUANTILE_99 -) - -type DataSource int - -const ( - _ DataSource = iota - METRICS - TRACES - LOGS -) - const ( StringTagMapCol = "stringTagMap" NumberTagMapCol = "numberTagMap" @@ -129,16 +25,6 @@ const ( ResourceTagMapCol = "resourceTagsMap" ) -type QueryRangeParamsV2 struct { - DataSource DataSource `json:"dataSource"` - Start int64 `json:"start"` - End int64 `json:"end"` - Step int64 `json:"step"` - CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"` - Variables map[string]interface{} `json:"variables,omitempty"` - NoCache bool `json:"noCache"` -} - type DashboardVars struct { Query string `json:"query"` Variables map[string]interface{} `json:"variables,omitempty"` diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 4add34071e..1b86ff7e8b 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "math" - "sort" "strconv" "time" @@ -79,7 +78,7 @@ func BadRequest(err error) *ApiError { func BadRequestStr(s string) *ApiError { return &ApiError{ Typ: ErrorBadData, - Err: fmt.Errorf(s), + Err: errors.New(s), } } @@ -500,46 +499,12 @@ type NextPrevErrorIDs struct { GroupID string `json:"groupID"` } -type Series struct { - QueryName string `json:"queryName"` - Labels map[string]string `json:"metric"` - Points []MetricPoint `json:"values"` -} - -func (s *Series) SortPoints() { - sort.Slice(s.Points, func(i, j int) bool { - return s.Points[i].Timestamp < s.Points[j].Timestamp - }) -} - -type MetricPoint struct { - Timestamp int64 - Value float64 -} - type MetricStatus struct { MetricName string LastReceivedTsMillis int64 LastReceivedLabels map[string]string } -// MarshalJSON implements json.Marshaler. -func (p *MetricPoint) MarshalJSON() ([]byte, error) { - v := strconv.FormatFloat(p.Value, 'f', -1, 64) - return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v}) -} - -// UnmarshalJSON implements json.Unmarshaler. -func (p *MetricPoint) UnmarshalJSON(b []byte) error { - var a [2]interface{} - if err := json.Unmarshal(b, &a); err != nil { - return err - } - p.Timestamp = int64(a[0].(float64) * 1000) - p.Value, _ = strconv.ParseFloat(a[1].(string), 64) - return nil -} - type ShowCreateTableStatement struct { Statement string `json:"statement" ch:"statement"` } diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 4af5d36ae4..18163af723 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -11,7 +11,6 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" - "go.signoz.io/signoz/pkg/query-service/model" ) type DataSource string @@ -1035,20 +1034,6 @@ type Result struct { Table *Table `json:"table,omitempty"` } -type LogsLiveTailClientV2 struct { - Name string - Logs chan *model.SignozLogV2 - Done chan *bool - Error chan error -} - -type LogsLiveTailClient struct { - Name string - Logs chan *model.SignozLog - Done chan *bool - Error chan error -} - type Series struct { Labels map[string]string `json:"labels"` LabelsArray []map[string]string `json:"labelsArray"` @@ -1167,115 +1152,6 @@ type MetricMetadataResponse struct { Temporality string `json:"temporality"` } -type LabelsString string - -func (l *LabelsString) MarshalJSON() ([]byte, error) { - lbls := make(map[string]string) - err := json.Unmarshal([]byte(*l), &lbls) - if err != nil { - return nil, err - } - return json.Marshal(lbls) -} - -func (l *LabelsString) Scan(src interface{}) error { - if data, ok := src.(string); ok { - *l = LabelsString(data) - } - return nil -} - -func (l LabelsString) String() string { - return string(l) -} - -type RuleStateTimeline struct { - Items []RuleStateHistory `json:"items"` - Total uint64 `json:"total"` - Labels map[string][]string `json:"labels"` -} - -type RuleStateHistory struct { - RuleID string `json:"ruleID" ch:"rule_id"` - RuleName string `json:"ruleName" ch:"rule_name"` - // One of ["normal", "firing"] - OverallState model.AlertState `json:"overallState" ch:"overall_state"` - OverallStateChanged bool `json:"overallStateChanged" ch:"overall_state_changed"` - // One of ["normal", "firing", "no_data", "muted"] - State model.AlertState `json:"state" ch:"state"` - StateChanged bool `json:"stateChanged" ch:"state_changed"` - UnixMilli int64 `json:"unixMilli" ch:"unix_milli"` - Labels LabelsString `json:"labels" ch:"labels"` - Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"` - Value float64 `json:"value" ch:"value"` - - RelatedTracesLink string `json:"relatedTracesLink"` - RelatedLogsLink string `json:"relatedLogsLink"` -} - -type QueryRuleStateHistory struct { - Start int64 `json:"start"` - End int64 `json:"end"` - State string `json:"state"` - Filters *FilterSet `json:"filters"` - Offset int64 `json:"offset"` - Limit int64 `json:"limit"` - Order string `json:"order"` -} - -func (r *QueryRuleStateHistory) Validate() error { - if r.Start == 0 || r.End == 0 { - return fmt.Errorf("start and end are required") - } - if r.Offset < 0 || r.Limit < 0 { - return fmt.Errorf("offset and limit must be greater than 0") - } - if r.Order != "asc" && r.Order != "desc" { - return fmt.Errorf("order must be asc or desc") - } - return nil -} - -type RuleStateHistoryContributor struct { - Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"` - Labels LabelsString `json:"labels" ch:"labels"` - Count uint64 `json:"count" ch:"count"` - RelatedTracesLink string `json:"relatedTracesLink"` - RelatedLogsLink string `json:"relatedLogsLink"` -} - -type RuleStateTransition struct { - RuleID string `json:"ruleID" ch:"rule_id"` - State model.AlertState `json:"state" ch:"state"` - FiringTime int64 `json:"firingTime" ch:"firing_time"` - ResolutionTime int64 `json:"resolutionTime" ch:"resolution_time"` -} - -type ReleStateItem struct { - State model.AlertState `json:"state"` - Start int64 `json:"start"` - End int64 `json:"end"` -} - -type Stats struct { - TotalCurrentTriggers uint64 `json:"totalCurrentTriggers"` - TotalPastTriggers uint64 `json:"totalPastTriggers"` - CurrentTriggersSeries *Series `json:"currentTriggersSeries"` - PastTriggersSeries *Series `json:"pastTriggersSeries"` - CurrentAvgResolutionTime string `json:"currentAvgResolutionTime"` - PastAvgResolutionTime string `json:"pastAvgResolutionTime"` - CurrentAvgResolutionTimeSeries *Series `json:"currentAvgResolutionTimeSeries"` - PastAvgResolutionTimeSeries *Series `json:"pastAvgResolutionTimeSeries"` -} - -type QueryProgress struct { - ReadRows uint64 `json:"read_rows"` - - ReadBytes uint64 `json:"read_bytes"` - - ElapsedMs uint64 `json:"elapsed_ms"` -} - type URLShareableTimeRange struct { Start int64 `json:"start"` End int64 `json:"end"` diff --git a/pkg/query-service/rules/base_rule.go b/pkg/query-service/rules/base_rule.go index 6b359d5d38..6fbaa655c7 100644 --- a/pkg/query-service/rules/base_rule.go +++ b/pkg/query-service/rules/base_rule.go @@ -472,9 +472,9 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { return alertSmpl, shouldAlert } -func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { +func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []model.RuleStateHistory) error { zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) - revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} + revisedItemsToAdd := map[uint64]model.RuleStateHistory{} lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) if err != nil { @@ -484,7 +484,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren // the state would reset so we need to add the corresponding state changes to previously saved states if !r.handledRestart && len(lastSavedState) > 0 { zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) - l := map[uint64]v3.RuleStateHistory{} + l := map[uint64]model.RuleStateHistory{} for _, item := range itemsToAdd { l[item.Fingerprint] = item } @@ -553,7 +553,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren if len(revisedItemsToAdd) > 0 && r.reader != nil { zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) + entries := make([]model.RuleStateHistory, 0, len(revisedItemsToAdd)) for _, item := range revisedItemsToAdd { entries = append(entries, item) } diff --git a/pkg/query-service/rules/prom_rule.go b/pkg/query-service/rules/prom_rule.go index 7136a88e97..db5a963731 100644 --- a/pkg/query-service/rules/prom_rule.go +++ b/pkg/query-service/rules/prom_rule.go @@ -219,7 +219,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) } - itemsToAdd := []v3.RuleStateHistory{} + itemsToAdd := []model.RuleStateHistory{} // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, a := range r.active { @@ -236,13 +236,13 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) if a.State != model.StateInactive { a.State = model.StateInactive a.ResolvedAt = ts - itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ + itemsToAdd = append(itemsToAdd, model.RuleStateHistory{ RuleID: r.ID(), RuleName: r.Name(), State: model.StateInactive, StateChanged: true, UnixMilli: ts.UnixMilli(), - Labels: v3.LabelsString(labelsJSON), + Labels: model.LabelsString(labelsJSON), Fingerprint: a.QueryResultLables.Hash(), }) } @@ -256,13 +256,13 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) if a.Missing { state = model.StateNoData } - itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ + itemsToAdd = append(itemsToAdd, model.RuleStateHistory{ RuleID: r.ID(), RuleName: r.Name(), State: state, StateChanged: true, UnixMilli: ts.UnixMilli(), - Labels: v3.LabelsString(labelsJSON), + Labels: model.LabelsString(labelsJSON), Fingerprint: a.QueryResultLables.Hash(), Value: a.Value, }) diff --git a/pkg/query-service/rules/rule.go b/pkg/query-service/rules/rule.go index bb41a2be13..2b5b8d5aae 100644 --- a/pkg/query-service/rules/rule.go +++ b/pkg/query-service/rules/rule.go @@ -5,7 +5,6 @@ import ( "time" "go.signoz.io/signoz/pkg/query-service/model" - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils/labels" ) @@ -35,7 +34,7 @@ type Rule interface { SetEvaluationTimestamp(time.Time) GetEvaluationTimestamp() time.Time - RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error + RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []model.RuleStateHistory) error SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) } diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index 01b6b1d2fd..f7cdfd6708 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -703,7 +703,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er r.active[h] = a } - itemsToAdd := []v3.RuleStateHistory{} + itemsToAdd := []model.RuleStateHistory{} // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, a := range r.active { @@ -720,13 +720,13 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er if a.State != model.StateInactive { a.State = model.StateInactive a.ResolvedAt = ts - itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ + itemsToAdd = append(itemsToAdd, model.RuleStateHistory{ RuleID: r.ID(), RuleName: r.Name(), State: model.StateInactive, StateChanged: true, UnixMilli: ts.UnixMilli(), - Labels: v3.LabelsString(labelsJSON), + Labels: model.LabelsString(labelsJSON), Fingerprint: a.QueryResultLables.Hash(), Value: a.Value, }) @@ -741,13 +741,13 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er if a.Missing { state = model.StateNoData } - itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ + itemsToAdd = append(itemsToAdd, model.RuleStateHistory{ RuleID: r.ID(), RuleName: r.Name(), State: state, StateChanged: true, UnixMilli: ts.UnixMilli(), - Labels: v3.LabelsString(labelsJSON), + Labels: model.LabelsString(labelsJSON), Fingerprint: a.QueryResultLables.Hash(), Value: a.Value, })