From 2ead4fbb66bce5ca8c01ce343eeead8d25109fac Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Thu, 9 Jan 2025 16:26:06 +0530 Subject: [PATCH] fix: Modifies messaging queue paylod (#6783) * fix: use filterset Signed-off-by: Shivanshu Raj Shrivastava --- pkg/query-service/app/http_handler.go | 112 ++++++++------- .../messagingQueues/celery/translator.go | 1 + .../messagingQueues/kafka/model.go | 34 ----- .../integrations/messagingQueues/kafka/sql.go | 134 ------------------ .../messagingQueues/kafka/translator.go | 33 +---- .../messagingQueues/queues/model.go | 27 ++++ .../messagingQueues/queues/queueOverview.go | 19 +++ .../messagingQueues/queues/sql.go | 117 +++++++++++++++ 8 files changed, 225 insertions(+), 252 deletions(-) create mode 100644 pkg/query-service/app/integrations/messagingQueues/celery/translator.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/queues/model.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/queues/queueOverview.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/queues/sql.go diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 030f715dbe..505464a6ee 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -29,6 +29,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/inframetrics" "go.signoz.io/signoz/pkg/query-service/app/integrations" + queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues" "go.signoz.io/signoz/pkg/query-service/app/logs" logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsv4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" @@ -50,11 +51,11 @@ import ( "go.uber.org/zap" - mq "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" + "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/dao" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" - signozio "go.signoz.io/signoz/pkg/query-service/integrations/signozio" + "go.signoz.io/signoz/pkg/query-service/integrations/signozio" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/rules" @@ -2565,14 +2566,14 @@ func (aH *APIHandler) onboardProducers( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) RespondError(w, apiErr, nil) return } - chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers") + chq, err := kafka.BuildClickHouseQuery(messagingQueue, kafka.KafkaQueue, "onboard_producers") if err != nil { zap.L().Error(err.Error()) @@ -2588,7 +2589,7 @@ func (aH *APIHandler) onboardProducers( return } - var entries []mq.OnboardingResponse + var entries []kafka.OnboardingResponse for _, result := range results { @@ -2601,7 +2602,7 @@ func (aH *APIHandler) onboardProducers( attribute = "telemetry ingestion" if intValue != 0 { entries = nil - entry := mq.OnboardingResponse{ + entry := kafka.OnboardingResponse{ Attribute: attribute, Message: "No data available in the given time range", Status: "0", @@ -2645,7 +2646,7 @@ func (aH *APIHandler) onboardProducers( } } - entry := mq.OnboardingResponse{ + entry := kafka.OnboardingResponse{ Attribute: attribute, Message: message, Status: status, @@ -2667,14 +2668,14 @@ func (aH *APIHandler) onboardConsumers( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) RespondError(w, apiErr, nil) return } - chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_consumers") + chq, err := kafka.BuildClickHouseQuery(messagingQueue, kafka.KafkaQueue, "onboard_consumers") if err != nil { zap.L().Error(err.Error()) @@ -2690,7 +2691,7 @@ func (aH *APIHandler) onboardConsumers( return } - var entries []mq.OnboardingResponse + var entries []kafka.OnboardingResponse for _, result := range result { for key, value := range result.Data { @@ -2702,7 +2703,7 @@ func (aH *APIHandler) onboardConsumers( attribute = "telemetry ingestion" if intValue != 0 { entries = nil - entry := mq.OnboardingResponse{ + entry := kafka.OnboardingResponse{ Attribute: attribute, Message: "No data available in the given time range", Status: "0", @@ -2786,7 +2787,7 @@ func (aH *APIHandler) onboardConsumers( } } - entry := mq.OnboardingResponse{ + entry := kafka.OnboardingResponse{ Attribute: attribute, Message: message, Status: status, @@ -2807,14 +2808,14 @@ func (aH *APIHandler) onboardKafka( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) RespondError(w, apiErr, nil) return } - queryRangeParams, err := mq.BuildBuilderQueriesKafkaOnboarding(messagingQueue) + queryRangeParams, err := kafka.BuildBuilderQueriesKafkaOnboarding(messagingQueue) if err != nil { zap.L().Error(err.Error()) @@ -2829,7 +2830,7 @@ func (aH *APIHandler) onboardKafka( return } - var entries []mq.OnboardingResponse + var entries []kafka.OnboardingResponse var fetchLatencyState, consumerLagState bool @@ -2853,7 +2854,7 @@ func (aH *APIHandler) onboardKafka( } if !fetchLatencyState && !consumerLagState { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "telemetry ingestion", Message: "No data available in the given time range", Status: "0", @@ -2861,26 +2862,26 @@ func (aH *APIHandler) onboardKafka( } if !fetchLatencyState { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "kafka_consumer_fetch_latency_avg", Message: "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.", Status: "0", }) } else { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "kafka_consumer_fetch_latency_avg", Status: "1", }) } if !consumerLagState { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "kafka_consumer_group_lag", Message: "Metric kafka_consumer_group_lag is not present in the given time range.", Status: "0", }) } else { - entries = append(entries, mq.OnboardingResponse{ + entries = append(entries, kafka.OnboardingResponse{ Attribute: "kafka_consumer_group_lag", Status: "1", }) @@ -2892,10 +2893,10 @@ func (aH *APIHandler) onboardKafka( func (aH *APIHandler) getNetworkData( w http.ResponseWriter, r *http.Request, ) { - attributeCache := &mq.Clients{ + attributeCache := &kafka.Clients{ Hash: make(map[string]struct{}), } - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -2903,7 +2904,7 @@ func (aH *APIHandler) getNetworkData( return } - queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache) + queryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -2942,7 +2943,7 @@ func (aH *APIHandler) getNetworkData( } } - queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache) + queryRangeParams, err = kafka.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -2992,7 +2993,7 @@ func (aH *APIHandler) getProducerData( w http.ResponseWriter, r *http.Request, ) { // parse the query params to retrieve the messaging queue struct - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3000,7 +3001,7 @@ func (aH *APIHandler) getProducerData( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3033,7 +3034,7 @@ func (aH *APIHandler) getProducerData( func (aH *APIHandler) getConsumerData( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3041,7 +3042,7 @@ func (aH *APIHandler) getConsumerData( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3075,7 +3076,7 @@ func (aH *APIHandler) getConsumerData( func (aH *APIHandler) getPartitionOverviewLatencyData( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3083,7 +3084,7 @@ func (aH *APIHandler) getPartitionOverviewLatencyData( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3117,7 +3118,7 @@ func (aH *APIHandler) getPartitionOverviewLatencyData( func (aH *APIHandler) getConsumerPartitionLatencyData( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3125,7 +3126,7 @@ func (aH *APIHandler) getConsumerPartitionLatencyData( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3162,7 +3163,7 @@ func (aH *APIHandler) getConsumerPartitionLatencyData( func (aH *APIHandler) getProducerThroughputOverview( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3170,11 +3171,11 @@ func (aH *APIHandler) getProducerThroughputOverview( return } - attributeCache := &mq.Clients{ + attributeCache := &kafka.Clients{ Hash: make(map[string]struct{}), } - producerQueryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache) + producerQueryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3212,7 +3213,7 @@ func (aH *APIHandler) getProducerThroughputOverview( } } - queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache) + queryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3265,7 +3266,7 @@ func (aH *APIHandler) getProducerThroughputOverview( func (aH *APIHandler) getProducerThroughputDetails( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3273,7 +3274,7 @@ func (aH *APIHandler) getProducerThroughputDetails( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-throughput-details") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-throughput-details") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3307,7 +3308,7 @@ func (aH *APIHandler) getProducerThroughputDetails( func (aH *APIHandler) getConsumerThroughputOverview( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3315,7 +3316,7 @@ func (aH *APIHandler) getConsumerThroughputOverview( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3349,7 +3350,7 @@ func (aH *APIHandler) getConsumerThroughputOverview( func (aH *APIHandler) getConsumerThroughputDetails( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3357,7 +3358,7 @@ func (aH *APIHandler) getConsumerThroughputDetails( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3394,7 +3395,7 @@ func (aH *APIHandler) getConsumerThroughputDetails( func (aH *APIHandler) getProducerConsumerEval( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueBody(r) + messagingQueue, apiErr := ParseKafkaQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -3402,7 +3403,7 @@ func (aH *APIHandler) getProducerConsumerEval( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval") + queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval") if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3431,15 +3432,24 @@ func (aH *APIHandler) getProducerConsumerEval( aH.Respond(w, resp) } -// ParseMessagingQueueBody parse for messaging queue params -func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { - messagingQueue := new(mq.MessagingQueue) +// ParseKafkaQueueBody parse for messaging queue params +func ParseKafkaQueueBody(r *http.Request) (*kafka.MessagingQueue, *model.ApiError) { + messagingQueue := new(kafka.MessagingQueue) if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil { return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} } return messagingQueue, nil } +// ParseQueueBody parses for any queue +func ParseQueueBody(r *http.Request) (*queues2.QueueListRequest, *model.ApiError) { + queue := new(queues2.QueueListRequest) + if err := json.NewDecoder(r.Body).Decode(queue); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + return queue, nil +} + // Preferences func (aH *APIHandler) getUserPreference( @@ -4963,9 +4973,8 @@ func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) { - // ToDo: add capability of dynamic filtering based on any of the filters using QueueFilters - messagingQueue, apiErr := ParseMessagingQueueBody(r) + queueListRequest, apiErr := ParseQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -4973,11 +4982,11 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) { return } - chq, err := mq.BuildClickHouseQuery(messagingQueue, "", "overview") + chq, err := queues2.BuildOverviewQuery(queueListRequest) if err != nil { zap.L().Error(err.Error()) - RespondError(w, apiErr, nil) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil) return } @@ -4987,7 +4996,6 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) { - // TODO: Implement celery overview logic for both worker and tasks types } func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/query-service/app/integrations/messagingQueues/celery/translator.go b/pkg/query-service/app/integrations/messagingQueues/celery/translator.go new file mode 100644 index 0000000000..bf7e67e150 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/celery/translator.go @@ -0,0 +1 @@ +package celery diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index 08b13a1ffb..de5d83487b 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -22,37 +22,3 @@ type OnboardingResponse struct { Message string `json:"error_message"` Status string `json:"status"` } - -// QueueFilters -// ToDo: add capability of dynamic filtering based on any of the filters -type QueueFilters struct { - ServiceName []string - SpanName []string - Queue []string - Destination []string - Kind []string -} - -type CeleryTask struct { - kind string - status string -} - -type CeleryTasks interface { - GetKind() string - GetStatus() string - Set(string, string) -} - -func (r *CeleryTask) GetKind() string { - return r.kind -} - -func (r *CeleryTask) GetStatus() string { - return r.status -} - -func (r *CeleryTask) Set(kind, status string) { - r.kind = kind - r.status = status -} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 8f1e010939..9b943acbc8 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -2,7 +2,6 @@ package kafka import ( "fmt" - "strings" ) func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { @@ -319,139 +318,6 @@ GROUP BY return query } -// generateOverviewSQL builds the ClickHouse SQL query with optional filters. -// If a filter slice is empty, the query does not constrain on that field. -func generateOverviewSQL(start, end int64, filters *QueueFilters) string { - // Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse - startSeconds := float64(start) / 1e9 - endSeconds := float64(end) / 1e9 - - // Compute time range difference in Go - timeRangeSecs := endSeconds - startSeconds - - // Example ts_bucket boundaries (could be your own logic) - tsBucketStart := startSeconds - 1800 - tsBucketEnd := endSeconds - - // Build WHERE clauses for optional filters - // We always require messaging_system IN ('kafka', 'celery'), but - // we add additional AND conditions only if the slices are non-empty. - var whereClauses []string - - // Mandatory base filter: show only kafka/celery - whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')") - - if len(filters.ServiceName) > 0 { - whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName)) - } - if len(filters.SpanName) > 0 { - whereClauses = append(whereClauses, inClause("span_name", filters.SpanName)) - } - if len(filters.Queue) > 0 { - // "queue" in the struct refers to the messaging_system in the DB - whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue)) - } - if len(filters.Destination) > 0 { - whereClauses = append(whereClauses, inClause("destination", filters.Destination)) - } - if len(filters.Kind) > 0 { - whereClauses = append(whereClauses, inClause("kind_string", filters.Kind)) - } - - // Combine all WHERE clauses with AND - whereSQL := strings.Join(whereClauses, "\n AND ") - - if len(whereSQL) > 0 { - whereSQL = fmt.Sprintf("AND %s", whereSQL) - } - - // Final query string - // Note the use of %f for float64 values in fmt.Sprintf - query := fmt.Sprintf(` -WITH - processed_traces AS ( - SELECT - resource_string_service$$name AS service_name, - name AS span_name, - CASE - WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system - WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery' - ELSE 'undefined' - END AS messaging_system, - kind_string, - COALESCE( - NULLIF(attributes_string['messaging.destination.name'], ''), - NULLIF(attributes_string['messaging.destination'], '') - ) AS destination, - durationNano, - status_code - FROM signoz_traces.distributed_signoz_index_v3 - WHERE - timestamp >= toDateTime64(%f, 9) - AND timestamp <= toDateTime64(%f, 9) - AND ts_bucket_start >= toDateTime64(%f, 9) - AND ts_bucket_start <= toDateTime64(%f, 9) - AND ( - attribute_string_messaging$$system = 'kafka' - OR has(attributes_string, 'celery.action') - OR has(attributes_string, 'celery.task_name') - ) - %s - ), - aggregated_metrics AS ( - SELECT - service_name, - span_name, - messaging_system, - destination, - kind_string, - count(*) AS total_count, - sumIf(1, status_code = 2) AS error_count, - quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms - FROM - processed_traces - GROUP BY - service_name, - span_name, - messaging_system, - destination, - kind_string - ) -SELECT - aggregated_metrics.service_name, - aggregated_metrics.span_name, - aggregated_metrics.messaging_system, - aggregated_metrics.destination, - aggregated_metrics.kind_string, - COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput, - COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage, - aggregated_metrics.p95_latency -FROM - aggregated_metrics -ORDER BY - aggregated_metrics.service_name, - aggregated_metrics.span_name; -`, - startSeconds, endSeconds, - tsBucketStart, tsBucketEnd, - whereSQL, timeRangeSecs, - ) - - return query -} - -// inClause returns SQL like "fieldName IN ('val1','val2','val3')" -func inClause(fieldName string, values []string) string { - // Quote and escape each value for safety - var quoted []string - for _, v := range values { - // Simple escape: replace any single quotes in v - safeVal := strings.ReplaceAll(v, "'", "''") - quoted = append(quoted, fmt.Sprintf("'%s'", safeVal)) - } - return fmt.Sprintf("%s IN (%s)", fieldName, strings.Join(quoted, ",")) -} - func generateProducerSQL(start, end int64, topic, partition, queueType string) string { timeRange := (end - start) / 1000000000 tsBucketStart := (start / 1000000000) - 1800 diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index b5fca5cf29..f5a669755f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,11 +2,9 @@ package kafka import ( "fmt" - "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" - "strings" ) var defaultStepInterval int64 = 60 @@ -21,6 +19,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) queueType := KafkaQueue chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) + if err != nil { return nil, err } @@ -321,34 +320,6 @@ func BuildQRParamsWithCache( return queryRangeParams, err } -func getFilters(variables map[string]string) *QueueFilters { - return &QueueFilters{ - ServiceName: parseFilter(variables["service_name"]), - SpanName: parseFilter(variables["span_name"]), - Queue: parseFilter(variables["queue"]), - Destination: parseFilter(variables["destination"]), - Kind: parseFilter(variables["kind"]), - } -} - -// parseFilter splits a comma-separated string into a []string. -// Returns an empty slice if the input is blank. -func parseFilter(val string) []string { - if val == "" { - return []string{} - } - // Split on commas, trim whitespace around each part - parts := strings.Split(val, ",") - var out []string - for _, p := range parts { - trimmed := strings.TrimSpace(p) - if trimmed != "" { - out = append(out, trimmed) - } - } - return out -} - func BuildClickHouseQuery( messagingQueue *MessagingQueue, queueType string, @@ -385,8 +356,6 @@ func BuildClickHouseQuery( var query string switch queryContext { - case "overview": - query = generateOverviewSQL(start, end, getFilters(messagingQueue.Variables)) case "producer": query = generateProducerSQL(start, end, topic, partition, queueType) case "consumer": diff --git a/pkg/query-service/app/integrations/messagingQueues/queues/model.go b/pkg/query-service/app/integrations/messagingQueues/queues/model.go new file mode 100644 index 0000000000..eca7bf870c --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/queues/model.go @@ -0,0 +1,27 @@ +package queues + +import ( + "fmt" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +type QueueListRequest struct { + Start int64 `json:"start"` // unix nano + End int64 `json:"end"` // unix nano + Filters *v3.FilterSet `json:"filters"` + Limit int `json:"limit"` +} + +func (qr *QueueListRequest) Validate() error { + + err := qr.Filters.Validate() + if err != nil { + return err + } + + if qr.Start < 0 || qr.End < 0 { + return fmt.Errorf("start and end must be unixnano time") + } + return nil +} diff --git a/pkg/query-service/app/integrations/messagingQueues/queues/queueOverview.go b/pkg/query-service/app/integrations/messagingQueues/queues/queueOverview.go new file mode 100644 index 0000000000..cbfe0ad0d1 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/queues/queueOverview.go @@ -0,0 +1,19 @@ +package queues + +import ( + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func BuildOverviewQuery(queueList *QueueListRequest) (*v3.ClickHouseQuery, error) { + + err := queueList.Validate() + if err != nil { + return nil, err + } + + query := generateOverviewSQL(queueList.Start, queueList.End, queueList.Filters.Items) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} diff --git a/pkg/query-service/app/integrations/messagingQueues/queues/sql.go b/pkg/query-service/app/integrations/messagingQueues/queues/sql.go new file mode 100644 index 0000000000..450c9d0773 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/queues/sql.go @@ -0,0 +1,117 @@ +package queues + +import ( + "fmt" + "strings" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + format "go.signoz.io/signoz/pkg/query-service/utils" +) + +// generateOverviewSQL builds the ClickHouse SQL query with optional filters. +// If a filter slice is empty, the query does not constrain on that field. +func generateOverviewSQL(start, end int64, item []v3.FilterItem) string { + // Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse + startSeconds := float64(start) / 1e9 + endSeconds := float64(end) / 1e9 + + timeRangeSecs := endSeconds - startSeconds + + tsBucketStart := startSeconds - 1800 + tsBucketEnd := endSeconds + + var whereClauses []string + + whereClauses = append(whereClauses, fmt.Sprintf("timestamp >= toDateTime64(%f, 9)", startSeconds)) + whereClauses = append(whereClauses, fmt.Sprintf("timestamp <= toDateTime64(%f, 9)", endSeconds)) + + for _, filter := range item { + switch filter.Key.Key { + case "service.name": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "service_name", format.ClickHouseFormattedValue(filter.Value))) + case "name": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "span_name", format.ClickHouseFormattedValue(filter.Value))) + case "destination": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "destination", format.ClickHouseFormattedValue(filter.Value))) + case "queue": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "messaging_system", format.ClickHouseFormattedValue(filter.Value))) + case "kind_string": + whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "kind_string", format.ClickHouseFormattedValue(filter.Value))) + } + } + + // Combine all WHERE clauses with AND + whereSQL := strings.Join(whereClauses, "\n AND ") + + if len(whereSQL) > 0 { + whereSQL = fmt.Sprintf("AND %s", whereSQL) + } + + query := fmt.Sprintf(` +WITH + processed_traces AS ( + SELECT + resource_string_service$$name AS service_name, + name AS span_name, + CASE + WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system + WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery' + ELSE 'undefined' + END AS messaging_system, + kind_string, + COALESCE( + NULLIF(attributes_string['messaging.destination.name'], ''), + NULLIF(attributes_string['messaging.destination'], '') + ) AS destination, + durationNano, + status_code + FROM signoz_traces.distributed_signoz_index_v3 + WHERE + ts_bucket_start >= toDateTime64(%f, 9) + AND ts_bucket_start <= toDateTime64(%f, 9) + AND ( + attribute_string_messaging$$system = 'kafka' + OR has(attributes_string, 'celery.action') + OR has(attributes_string, 'celery.task_name') + ) + %s + ), + aggregated_metrics AS ( + SELECT + service_name, + span_name, + messaging_system, + destination, + kind_string, + count(*) AS total_count, + sumIf(1, status_code = 2) AS error_count, + quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms + FROM + processed_traces + GROUP BY + service_name, + span_name, + messaging_system, + destination, + kind_string + ) +SELECT + aggregated_metrics.service_name, + aggregated_metrics.span_name, + aggregated_metrics.messaging_system, + aggregated_metrics.destination, + aggregated_metrics.kind_string, + COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput, + COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage, + aggregated_metrics.p95_latency +FROM + aggregated_metrics +ORDER BY + aggregated_metrics.service_name, + aggregated_metrics.span_name; +`, tsBucketStart, tsBucketEnd, + whereSQL, timeRangeSecs, + ) + + return query +}