Skip to content

Commit

Permalink
Merge branch 'develop' into kafka-consumer-lag-details-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
SagarRajput-7 authored Dec 12, 2024
2 parents 72252cd + 0fbfb6b commit c1cf048
Show file tree
Hide file tree
Showing 12 changed files with 2,941 additions and 86 deletions.
4 changes: 2 additions & 2 deletions frontend/src/container/CreateAlertRule/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export const logAlertDefaults: AlertDef = {
chQueries: {
A: {
name: 'A',
query: `select \ntoStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 MINUTE) AS interval, \ntoFloat64(count()) as value \nFROM signoz_logs.distributed_logs_v2 \nWHERE timestamp BETWEEN {{.start_timestamp_nano}} AND {{.end_timestamp_nano}} \nGROUP BY interval;\n\n-- available variables:\n-- \t{{.start_timestamp_nano}}\n-- \t{{.end_timestamp_nano}}\n\n-- required columns (or alias):\n-- \tvalue\n-- \tinterval`,
query: `select \ntoStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 MINUTE) AS interval, \ntoFloat64(count()) as value \nFROM signoz_logs.distributed_logs_v2 \nWHERE timestamp BETWEEN {{.start_timestamp_nano}} AND {{.end_timestamp_nano}} \nAND ts_bucket_start BETWEEN {{.start_timestamp}} - 1800 AND {{.end_timestamp}} \nGROUP BY interval;\n\n-- Please check docs here https://signoz.io/docs/userguide/logs_clickhouse_queries/\n\n-- available variables:\n-- \t{{.start_timestamp_nano}}\n-- \t{{.end_timestamp_nano}}\n-- \t{{.start_timestamp}}\n-- \t{{.end_timestamp}}\n\n-- required columns (or alias):\n-- \tvalue\n-- \tinterval`,
legend: '',
disabled: false,
},
Expand Down Expand Up @@ -149,7 +149,7 @@ export const traceAlertDefaults: AlertDef = {
chQueries: {
A: {
name: 'A',
query: `SELECT \n\ttoStartOfInterval(timestamp, INTERVAL 1 MINUTE) AS interval, \n\tstringTagMap['peer.service'] AS op_name, \n\ttoFloat64(avg(durationNano)) AS value \nFROM signoz_traces.distributed_signoz_index_v2 \nWHERE stringTagMap['peer.service']!='' \nAND timestamp BETWEEN {{.start_datetime}} AND {{.end_datetime}} \nGROUP BY (op_name, interval);\n\n-- available variables:\n-- \t{{.start_datetime}}\n-- \t{{.end_datetime}}\n\n-- required column alias:\n-- \tvalue\n-- \tinterval`,
query: `SELECT \n\ttoStartOfInterval(timestamp, INTERVAL 1 MINUTE) AS interval, \n\tresource_string_service$$name AS \`service.name\`, \n\ttoFloat64(avg(duration_nano)) AS value \nFROM signoz_traces.distributed_signoz_index_v3 \nWHERE resource_string_service$$name !='' \nAND timestamp BETWEEN {{.start_datetime}} AND {{.end_datetime}} \nAND ts_bucket_start BETWEEN {{.start_timestamp}} - 1800 AND {{.end_timestamp}} \nGROUP BY (\`service.name\`, interval);\n\n-- Please check docs here https://signoz.io/docs/userguide/writing-clickhouse-traces-query/\n\n-- available variables:\n-- \t{{.start_datetime}}\n-- \t{{.end_datetime}}\n-- \t{{.start_timestamp}}\n-- \t{{.end_timestamp}}\n\n-- required column alias:\n-- \tvalue\n-- \tinterval`,
legend: '',
disabled: false,
},
Expand Down
72 changes: 40 additions & 32 deletions frontend/src/container/MetricsApplication/Tabs/Overview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -145,41 +145,49 @@ function Application(): JSX.Element {
[servicename, topLevelOperations],
);

const operationPerSecWidget = getWidgetQueryBuilder({
query: {
queryType: EQueryType.QUERY_BUILDER,
promql: [],
builder: operationPerSec({
servicename,
tagFilterItems,
topLevelOperations: topLevelOperationsRoute,
const operationPerSecWidget = useMemo(
() =>
getWidgetQueryBuilder({
query: {
queryType: EQueryType.QUERY_BUILDER,
promql: [],
builder: operationPerSec({
servicename,
tagFilterItems,
topLevelOperations: topLevelOperationsRoute,
}),
clickhouse_sql: [],
id: uuid(),
},
title: GraphTitle.RATE_PER_OPS,
panelTypes: PANEL_TYPES.TIME_SERIES,
yAxisUnit: 'ops',
id: SERVICE_CHART_ID.rps,
}),
clickhouse_sql: [],
id: uuid(),
},
title: GraphTitle.RATE_PER_OPS,
panelTypes: PANEL_TYPES.TIME_SERIES,
yAxisUnit: 'ops',
id: SERVICE_CHART_ID.rps,
});
[servicename, tagFilterItems, topLevelOperationsRoute],
);

const errorPercentageWidget = getWidgetQueryBuilder({
query: {
queryType: EQueryType.QUERY_BUILDER,
promql: [],
builder: errorPercentage({
servicename,
tagFilterItems,
topLevelOperations: topLevelOperationsRoute,
const errorPercentageWidget = useMemo(
() =>
getWidgetQueryBuilder({
query: {
queryType: EQueryType.QUERY_BUILDER,
promql: [],
builder: errorPercentage({
servicename,
tagFilterItems,
topLevelOperations: topLevelOperationsRoute,
}),
clickhouse_sql: [],
id: uuid(),
},
title: GraphTitle.ERROR_PERCENTAGE,
panelTypes: PANEL_TYPES.TIME_SERIES,
yAxisUnit: '%',
id: SERVICE_CHART_ID.errorPercentage,
}),
clickhouse_sql: [],
id: uuid(),
},
title: GraphTitle.ERROR_PERCENTAGE,
panelTypes: PANEL_TYPES.TIME_SERIES,
yAxisUnit: '%',
id: SERVICE_CHART_ID.errorPercentage,
});
[servicename, tagFilterItems, topLevelOperationsRoute],
);

const stepInterval = useMemo(
() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,28 @@ function ServiceOverview({
[isSpanMetricEnable, queries],
);

const latencyWidget = getWidgetQueryBuilder({
query: {
queryType: EQueryType.QUERY_BUILDER,
promql: [],
builder: latency({
servicename,
tagFilterItems,
isSpanMetricEnable,
topLevelOperationsRoute,
const latencyWidget = useMemo(
() =>
getWidgetQueryBuilder({
query: {
queryType: EQueryType.QUERY_BUILDER,
promql: [],
builder: latency({
servicename,
tagFilterItems,
isSpanMetricEnable,
topLevelOperationsRoute,
}),
clickhouse_sql: [],
id: uuid(),
},
title: GraphTitle.LATENCY,
panelTypes: PANEL_TYPES.TIME_SERIES,
yAxisUnit: 'ns',
id: SERVICE_CHART_ID.latency,
}),
clickhouse_sql: [],
id: uuid(),
},
title: GraphTitle.LATENCY,
panelTypes: PANEL_TYPES.TIME_SERIES,
yAxisUnit: 'ns',
id: SERVICE_CHART_ID.latency,
});
[isSpanMetricEnable, servicename, tagFilterItems, topLevelOperationsRoute],
);

const isQueryEnabled =
!topLevelOperationsIsLoading && topLevelOperationsRoute.length > 0;
Expand Down
32 changes: 27 additions & 5 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,9 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
return &usageItems, nil
}

func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
searchSpansResult := []model.SearchSpansResult{
{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
Expand Down Expand Up @@ -1318,9 +1320,29 @@ func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.Sea
end = time.Now()
zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start)))

for i, item := range searchSpanResponses {
spanEvents := item.GetValues()
searchSpansResult[0].Events[i] = spanEvents
err = r.featureFlags.CheckFeature(model.SmartTraceDetail)
smartAlgoEnabled := err == nil
if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled {
start = time.Now()
searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit)
if err != nil {
return nil, err
}
end = time.Now()
zap.L().Debug("smartTraceAlgo took: ", zap.Duration("duration", end.Sub(start)))
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": len(searchScanResponses),
"spansRenderLimit": params.SpansRenderLimit,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LARGE_TRACE_OPENED, data, userEmail, true, false)
}
} else {
for i, item := range searchSpanResponses {
spanEvents := item.GetValues()
searchSpansResult[0].Events[i] = spanEvents
}
}

searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000)
Expand All @@ -1334,7 +1356,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {

if r.useTraceNewSchema {
return r.SearchTracesV2(ctx, params)
return r.SearchTracesV2(ctx, params, smartTraceAlgorithm)
}

var countSpans uint64
Expand Down
4 changes: 4 additions & 0 deletions pkg/query-service/app/dashboards/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ func GetDashboard(ctx context.Context, uuid string) (*Dashboard, *model.ApiError
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("no dashboard found with uuid: %s", uuid)}
}

if dashboard.Data["title"] == "Ingestion" && dashboard.Data["description"] != nil {
dashboard.Data["description"] = "This dashboard is deprecated. Please use the new Ingestion V2 dashboard. " + dashboard.Data["description"].(string)
}

return &dashboard, nil
}

Expand Down
46 changes: 26 additions & 20 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3141,14 +3141,14 @@ func (aH *APIHandler) getProducerThroughputOverview(
Hash: make(map[string]struct{}),
}

queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache)
producerQueryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
if err := validateQueryRangeParamsV3(producerQueryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
Expand All @@ -3157,29 +3157,29 @@ func (aH *APIHandler) getProducerThroughputOverview(
var result []*v3.Result
var errQuriesByName map[string]error

result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), producerQueryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}

for _, res := range result {
for _, list := range res.List {
serviceName, serviceNameOk := list.Data["service_name"].(*string)
topicName, topicNameOk := list.Data["topic"].(*string)
params := []string{*serviceName, *topicName}
for _, series := range res.Series {
serviceName, serviceNameOk := series.Labels["service_name"]
topicName, topicNameOk := series.Labels["topic"]
params := []string{serviceName, topicName}
hashKey := uniqueIdentifier(params, "#")
_, ok := attributeCache.Hash[hashKey]
if topicNameOk && serviceNameOk && !ok {
attributeCache.Hash[hashKey] = struct{}{}
attributeCache.TopicName = append(attributeCache.TopicName, *topicName)
attributeCache.ServiceName = append(attributeCache.ServiceName, *serviceName)
attributeCache.TopicName = append(attributeCache.TopicName, topicName)
attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName)
}
}
}

queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-latency", attributeCache)
queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
Expand All @@ -3198,26 +3198,32 @@ func (aH *APIHandler) getProducerThroughputOverview(
return
}

latencyColumn := &v3.Result{QueryName: "latency"}
var latencySeries []*v3.Row
byteRateColumn := &v3.Result{QueryName: "byte_rate"}
var byteRateSeries []*v3.Series
for _, res := range resultFetchLatency {
for _, list := range res.List {
topic, topicOk := list.Data["topic"].(*string)
serviceName, serviceNameOk := list.Data["service_name"].(*string)
params := []string{*serviceName, *topic}
for _, series := range res.Series {
topic, topicOk := series.Labels["topic"]
serviceName, serviceNameOk := series.Labels["service_name"]
params := []string{serviceName, topic}
hashKey := uniqueIdentifier(params, "#")
_, ok := attributeCache.Hash[hashKey]
if topicOk && serviceNameOk && ok {
latencySeries = append(latencySeries, list)
byteRateSeries = append(byteRateSeries, series)
}
}
}

latencyColumn.List = latencySeries
result = append(result, latencyColumn)
byteRateColumn.Series = byteRateSeries
var latencyColumnResult []*v3.Result
latencyColumnResult = append(latencyColumnResult, byteRateColumn)

resultFetchLatency = postprocess.TransformToTableForBuilderQueries(latencyColumnResult, queryRangeParams)

result = postprocess.TransformToTableForClickHouseQueries(result)

result = append(result, resultFetchLatency[0])
resp := v3.QueryRangeResponse{
Result: resultFetchLatency,
Result: result,
}
aH.Respond(w, resp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin

func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)
queryName := fmt.Sprintf("latency")
queryName := fmt.Sprintf("byte_rate")

chq := &v3.BuilderQuery{
QueryName: queryName,
StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd),
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "kafka_producer_byte_rate",
Key: "kafka_producer_byte_rate",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
AggregateOperator: v3.AggregateOperatorAvg,
Temporality: v3.Unspecified,
Expand Down Expand Up @@ -276,15 +279,16 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string,
cq, err = buildCompositeQuery(&v3.ClickHouseQuery{
Query: query,
}, queryContext)
} else if queryContext == "producer-throughput-overview-latency" {
} else if queryContext == "producer-throughput-overview-byte-rate" {
bhq, err := buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd, attributeCache)
if err != nil {
return nil, err
}
cq = &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: bhq,
PanelType: v3.PanelTypeList,
PanelType: v3.PanelTypeTable,
FillGaps: false,
}
}

Expand Down Expand Up @@ -315,7 +319,7 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
if !ok {
return nil, fmt.Errorf("invalid type for Topic")
}
if queryContext != "consumer-throughput-details" {
if !(queryContext == "consumer-throughput-details" || queryContext == "producer-throughput-details") {
partition, ok = messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
Expand Down Expand Up @@ -364,7 +368,7 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer

func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) {

if queryContext == "producer-consumer-eval" || queryContext == "producer-throughput-overview" {
if queryContext == "producer-consumer-eval" {
return &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq},
Expand Down
5 changes: 3 additions & 2 deletions pkg/query-service/app/traces/v4/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,9 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery)
}
} else {
column := getColumnName(mq.AggregateAttribute)
filterSubQuery = fmt.Sprintf("%s AND has(%s, '%s')", filterSubQuery, column, mq.AggregateAttribute.Key)
cType := getClickHouseTracesColumnType(mq.AggregateAttribute.Type)
cDataType := getClickHouseTracesColumnDataType(mq.AggregateAttribute.DataType)
filterSubQuery = fmt.Sprintf("%s AND mapContains(%s_%s, '%s')", filterSubQuery, cType, cDataType, mq.AggregateAttribute.Key)
}
}
op := "toFloat64(count())"
Expand Down
Loading

0 comments on commit c1cf048

Please sign in to comment.