Skip to content

Commit

Permalink
feat: trace v4 integration (#6226)
Browse files Browse the repository at this point in the history
* feat: trace v4 inital commit

* fix: add remaining files

* fix: integrate with querier

* fix: get trace by id api updated

* fix: add servicename resource filter

* fix: tests

* fix: use correct prepQUery

* fix: services page

* fix: minor fixes to use the new table in api's and querier

* fix: add support for window based pagination

* feat: support for faster trace detail

* fix: searchTraces

* fix: attribute enrichment updated and issue in group by

* fix: issues in group by

* fix: enrichment using alias

* fix: test file added

* fix: tests

* fix: group by with filters

* fix: add subquery

* fix: trigger builde

* fix: update pagination logic and few ch column names

* fix: update qb

* fix: add tests

* feat: minor fixes

* fix: update pagination logic

* fix: update pagination logic

* fix: remove utils

* fix: remove unwanted API's

* fix: attribute and attribute values v2

* fix: autocomplete api's updated

* fix: tests fixed

* feat: minor fixes

* fix: update telemetry functions

* fix: dont use alias, use proper col names

* fix: move models to it's own file

* fix: minor fixes

* fix: address comments

* fix: add to serviceoverview function

* fix: add changes to overview function

* fix: address comments

* fix: remove printlines

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
  • Loading branch information
nityanandagohain and srikanthccv authored Nov 22, 2024
1 parent e46d969 commit 67058b2
Show file tree
Hide file tree
Showing 21 changed files with 229 additions and 138 deletions.
8 changes: 5 additions & 3 deletions ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type APIHandlerOptions struct {
Cache cache.Cache
Gateway *httputil.ReverseProxy
// Querier Influx Interval
FluxInterval time.Duration
UseLogsNewSchema bool
UseLicensesV3 bool
FluxInterval time.Duration
UseLogsNewSchema bool
UseTraceNewSchema bool
UseLicensesV3 bool
}

type APIHandler struct {
Expand All @@ -66,6 +67,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
UseLicensesV3: opts.UseLicensesV3,
})

Expand Down
45 changes: 22 additions & 23 deletions ee/query-service/app/api/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,31 @@ package api

import (
"net/http"

"go.signoz.io/signoz/ee/query-service/app/db"
"go.signoz.io/signoz/ee/query-service/model"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
)

func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {

if !ah.CheckFeature(basemodel.SmartTraceDetail) {
zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
ah.APIHandler.SearchTraces(w, r)
return
}
searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
return
}

result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
if ah.HandleError(w, err, http.StatusBadRequest) {
return
}

ah.WriteJSON(w, r, result)
ah.APIHandler.SearchTraces(w, r)
return

// This is commented since this will be taken care by new trace API

// if !ah.CheckFeature(basemodel.SmartTraceDetail) {
// zap.L().Info("SmartTraceDetail feature is not enabled in this plan")
// ah.APIHandler.SearchTraces(w, r)
// return
// }
// searchTracesParams, err := baseapp.ParseSearchTracesParams(r)
// if err != nil {
// RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params")
// return
// }

// result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm)
// if ah.HandleError(w, err, http.StatusBadRequest) {
// return
// }

// ah.WriteJSON(w, r, result)

}
3 changes: 2 additions & 1 deletion ee/query-service/app/db/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ func NewDataConnector(
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema)
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema)
return &ClickhouseReader{
conn: ch.GetConn(),
appdb: localDB,
Expand Down
10 changes: 8 additions & 2 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ServerOptions struct {
Cluster string
GatewayUrl string
UseLogsNewSchema bool
UseTraceNewSchema bool
UseLicensesV3 bool
}

Expand Down Expand Up @@ -156,6 +157,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
)
go qb.Start(readerReady)
reader = qb
Expand Down Expand Up @@ -189,6 +191,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.DisableRules,
lm,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
)

if err != nil {
Expand Down Expand Up @@ -270,6 +273,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FluxInterval: fluxInterval,
Gateway: gatewayProxy,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
UseLicensesV3: serverOptions.UseLicensesV3,
}

Expand Down Expand Up @@ -737,7 +741,8 @@ func makeRulesManager(
cache cache.Cache,
disableRules bool,
fm baseint.FeatureLookup,
useLogsNewSchema bool) (*baserules.Manager, error) {
useLogsNewSchema bool,
useTraceNewSchema bool) (*baserules.Manager, error) {

// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
Expand Down Expand Up @@ -767,8 +772,9 @@ func makeRulesManager(
EvalDelay: baseconst.GetEvalDelay(),

PrepareTaskFunc: rules.PrepareTaskFunc,
PrepareTestRuleFunc: rules.TestNotification,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
PrepareTestRuleFunc: rules.TestNotification,
}

// create Manager
Expand Down
3 changes: 3 additions & 0 deletions ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func main() {
var cluster string

var useLogsNewSchema bool
var useTraceNewSchema bool
var useLicensesV3 bool
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool
Expand All @@ -105,6 +106,7 @@ func main() {
var gatewayUrl string

flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
Expand Down Expand Up @@ -145,6 +147,7 @@ func main() {
Cluster: cluster,
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
UseLicensesV3: useLicensesV3,
}

Expand Down
2 changes: 2 additions & 0 deletions ee/query-service/rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.FF,
opts.Reader,
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
)

Expand Down Expand Up @@ -122,6 +123,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.FF,
opts.Reader,
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
)
Expand Down
19 changes: 9 additions & 10 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func NewReader(
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
// useTraceNewSchema bool, // TODO: uncomment this in integration PR
useTraceNewSchema bool,
) *ClickHouseReader {

datasource := os.Getenv("ClickHouseUrl")
Expand All @@ -178,7 +178,7 @@ func NewReader(
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}

return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema)
}

func NewReaderFromClickhouseConnection(
Expand All @@ -189,7 +189,7 @@ func NewReaderFromClickhouseConnection(
featureFlag interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
// useTraceNewSchema bool,
useTraceNewSchema bool,
) *ClickHouseReader {
alertManager, err := am.New()
if err != nil {
Expand Down Expand Up @@ -229,11 +229,10 @@ func NewReaderFromClickhouseConnection(

traceTableName := options.primary.IndexTable
traceLocalTableName := options.primary.LocalIndexTable
// TODO: uncomment this in integration PR
// if useTraceNewSchema {
// traceTableName = options.primary.TraceIndexTableV3
// traceLocalTableName = options.primary.TraceLocalTableNameV3
// }
if useTraceNewSchema {
traceTableName = options.primary.TraceIndexTableV3
traceLocalTableName = options.primary.TraceLocalTableNameV3
}

return &ClickHouseReader{
db: wrap,
Expand Down Expand Up @@ -262,7 +261,8 @@ func NewReaderFromClickhouseConnection(
cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(),

useLogsNewSchema: useLogsNewSchema,
useLogsNewSchema: useLogsNewSchema,
useTraceNewSchema: useTraceNewSchema,

logsTableV2: options.primary.LogsTableV2,
logsLocalTableV2: options.primary.LogsLocalTableV2,
Expand All @@ -271,7 +271,6 @@ func NewReaderFromClickhouseConnection(
logsTableName: logsTableName,
logsLocalTableName: logsLocalTableName,

// useTraceNewSchema: useTraceNewSchema,
traceLocalTableName: traceLocalTableName,
traceTableName: traceTableName,
traceResourceTableV3: options.primary.TraceResourceTableV3,
Expand Down
54 changes: 37 additions & 17 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/common"
Expand Down Expand Up @@ -110,8 +111,9 @@ type APIHandler struct {
// Websocket connection upgrader
Upgrader *websocket.Upgrader

UseLogsNewSchema bool
UseLicensesV3 bool
UseLogsNewSchema bool
UseTraceNewSchema bool
UseLicensesV3 bool

hostsRepo *inframetrics.HostsRepo
processesRepo *inframetrics.ProcessesRepo
Expand Down Expand Up @@ -163,6 +165,7 @@ type APIHandlerOpts struct {
// Use Logs New schema
UseLogsNewSchema bool

UseTraceNewSchema bool
// Use Licenses V3 structure
UseLicensesV3 bool
}
Expand All @@ -176,21 +179,23 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
}

querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}

querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}

querier := querier.NewQuerier(querierOpts)
Expand Down Expand Up @@ -224,6 +229,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
querier: querier,
querierV2: querierv2,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
UseLicensesV3: opts.UseLicensesV3,
hostsRepo: hostsRepo,
processesRepo: processesRepo,
Expand All @@ -242,9 +248,14 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
logsQueryBuilder = logsv4.PrepareLogsQuery
}

tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}

builderOpts := queryBuilder.QueryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery,
BuildTraceQuery: tracesV3.PrepareTracesQuery,
BuildTraceQuery: tracesQueryBuilder,
BuildLogQuery: logsQueryBuilder,
}
aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags)
Expand Down Expand Up @@ -4330,7 +4341,12 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName)
return
}
tracesV3.Enrich(queryRangeParams, spanKeys)
if aH.UseTraceNewSchema {
tracesV4.Enrich(queryRangeParams, spanKeys)
} else {
tracesV3.Enrich(queryRangeParams, spanKeys)
}

}

// WARN: Only works for AND operator in traces query
Expand Down Expand Up @@ -4800,7 +4816,11 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName)
return
}
tracesV3.Enrich(queryRangeParams, spanKeys)
if aH.UseTraceNewSchema {
tracesV4.Enrich(queryRangeParams, spanKeys)
} else {
tracesV3.Enrich(queryRangeParams, spanKeys)
}
}

// WARN: Only works for AND operator in traces query
Expand Down
Loading

0 comments on commit 67058b2

Please sign in to comment.