diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 66b462e167..bb36fdf479 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -39,6 +39,8 @@ type APIHandlerOptions struct { Gateway *httputil.ReverseProxy // Querier Influx Interval FluxInterval time.Duration + + UseLogsNewSchema bool } type APIHandler struct { @@ -63,6 +65,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { LogsParsingPipelineController: opts.LogsParsingPipelineController, Cache: opts.Cache, FluxInterval: opts.FluxInterval, + UseLogsNewSchema: opts.UseLogsNewSchema, }) if err != nil { diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index b8326058ec..fcab1cb991 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -25,8 +25,9 @@ func NewDataConnector( maxOpenConns int, dialTimeout time.Duration, cluster string, + useLogsNewSchema bool, ) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster) + ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema) return &ClickhouseReader{ conn: ch.GetConn(), appdb: localDB, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index ee019e639a..9845ee670b 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -77,6 +77,7 @@ type ServerOptions struct { FluxInterval string Cluster string GatewayUrl string + UseLogsNewSchema bool } // Server runs HTTP api service @@ -154,6 +155,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.MaxOpenConns, serverOptions.DialTimeout, serverOptions.Cluster, + serverOptions.UseLogsNewSchema, ) go qb.Start(readerReady) reader = qb @@ -176,7 +178,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { localDB, reader, serverOptions.DisableRules, - lm) + lm, + serverOptions.UseLogsNewSchema, + ) if err != nil { return nil, err @@ -265,6 +269,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { Cache: c, FluxInterval: fluxInterval, Gateway: gatewayProxy, + UseLogsNewSchema: serverOptions.UseLogsNewSchema, } apiHandler, err := api.NewAPIHandler(apiOpts) @@ -728,7 +733,8 @@ func makeRulesManager( db *sqlx.DB, ch baseint.Reader, disableRules bool, - fm baseint.FeatureLookup) (*baserules.Manager, error) { + fm baseint.FeatureLookup, + useLogsNewSchema bool) (*baserules.Manager, error) { // create engine pqle, err := pqle.FromConfigPath(promConfigPath) @@ -756,7 +762,8 @@ func makeRulesManager( Reader: ch, EvalDelay: baseconst.GetEvalDelay(), - PrepareTaskFunc: rules.PrepareTaskFunc, + PrepareTaskFunc: rules.PrepareTaskFunc, + UseLogsNewSchema: useLogsNewSchema, } // create Manager diff --git a/ee/query-service/main.go b/ee/query-service/main.go index c5a03f4c0f..75a49500d0 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -87,6 +87,7 @@ func main() { var ruleRepoURL string var cluster string + var useLogsNewSchema bool var cacheConfigPath, fluxInterval string var enableQueryServiceLogOTLPExport bool var preferSpanMetrics bool @@ -96,6 +97,7 @@ func main() { var dialTimeout time.Duration var gatewayUrl string + flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") @@ -134,6 +136,7 @@ func main() { FluxInterval: fluxInterval, Cluster: cluster, GatewayUrl: gatewayUrl, + UseLogsNewSchema: useLogsNewSchema, } // Read the jwt secret key diff --git a/ee/query-service/rules/manager.go b/ee/query-service/rules/manager.go index d3bc03f58a..2b80441f0c 100644 --- a/ee/query-service/rules/manager.go +++ b/ee/query-service/rules/manager.go @@ -20,6 +20,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) opts.Rule, opts.FF, opts.Reader, + opts.UseLogsNewSchema, baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), ) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 2984fa0fa5..b3ef773da0 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -132,6 +132,8 @@ type ClickHouseReader struct { liveTailRefreshSeconds int cluster string + + useLogsNewSchema bool } // NewTraceReader returns a TraceReader for the database @@ -143,6 +145,7 @@ func NewReader( maxOpenConns int, dialTimeout time.Duration, cluster string, + useLogsNewSchema bool, ) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") @@ -153,7 +156,7 @@ func NewReader( zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) } - return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster) + return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema) } func NewReaderFromClickhouseConnection( @@ -163,6 +166,7 @@ func NewReaderFromClickhouseConnection( configFile string, featureFlag interfaces.FeatureLookup, cluster string, + useLogsNewSchema bool, ) *ClickHouseReader { alertManager, err := am.New("") if err != nil { @@ -219,6 +223,7 @@ func NewReaderFromClickhouseConnection( featureFlags: featureFlag, cluster: cluster, queryProgressTracker: queryprogress.NewQueryProgressTracker(), + useLogsNewSchema: useLogsNewSchema, } } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 957ea5aaff..d347bf576e 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -105,6 +105,8 @@ type APIHandler struct { // Websocket connection upgrader Upgrader *websocket.Upgrader + + UseLogsNewSchema bool } type APIHandlerOpts struct { @@ -140,6 +142,9 @@ type APIHandlerOpts struct { // Querier Influx Interval FluxInterval time.Duration + + // Use new schema + UseLogsNewSchema bool } // NewAPIHandler returns an APIHandler @@ -151,19 +156,21 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { } querierOpts := querier.QuerierOptions{ - Reader: opts.Reader, - Cache: opts.Cache, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FluxInterval: opts.FluxInterval, - FeatureLookup: opts.FeatureFlags, + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + UseLogsNewSchema: opts.UseLogsNewSchema, } querierOptsV2 := querierV2.QuerierOptions{ - Reader: opts.Reader, - Cache: opts.Cache, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FluxInterval: opts.FluxInterval, - FeatureLookup: opts.FeatureFlags, + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + UseLogsNewSchema: opts.UseLogsNewSchema, } querier := querier.NewQuerier(querierOpts) @@ -185,6 +192,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { LogsParsingPipelineController: opts.LogsParsingPipelineController, querier: querier, querierV2: querierv2, + UseLogsNewSchema: opts.UseLogsNewSchema, } builderOpts := queryBuilder.QueryBuilderOptions{ diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 86a77da114..0663afd126 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -54,6 +54,8 @@ type querier struct { timeRanges [][]int returnedSeries []*v3.Series returnedErr error + + UseLogsNewSchema bool } type QuerierOptions struct { @@ -64,9 +66,10 @@ type QuerierOptions struct { FeatureLookup interfaces.FeatureLookup // used for testing - TestingMode bool - ReturnedSeries []*v3.Series - ReturnedErr error + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error + UseLogsNewSchema bool } func NewQuerier(opts QuerierOptions) interfaces.Querier { diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index d0c3a77d13..01cbf6d649 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -54,6 +54,8 @@ type querier struct { timeRanges [][]int returnedSeries []*v3.Series returnedErr error + + UseLogsNewSchema bool } type QuerierOptions struct { @@ -64,9 +66,10 @@ type QuerierOptions struct { FeatureLookup interfaces.FeatureLookup // used for testing - TestingMode bool - ReturnedSeries []*v3.Series - ReturnedErr error + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error + UseLogsNewSchema bool } func NewQuerier(opts QuerierOptions) interfaces.Querier { diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 557b082f42..a1fc0dd329 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -66,6 +66,7 @@ type ServerOptions struct { CacheConfigPath string FluxInterval string Cluster string + UseLogsNewSchema bool } // Server runs HTTP, Mux and a grpc server @@ -128,6 +129,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.MaxOpenConns, serverOptions.DialTimeout, serverOptions.Cluster, + serverOptions.UseLogsNewSchema, ) go clickhouseReader.Start(readerReady) reader = clickhouseReader @@ -144,7 +146,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } <-readerReady - rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm) + rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema) if err != nil { return nil, err } @@ -197,6 +199,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { LogsParsingPipelineController: logParsingPipelineController, Cache: c, FluxInterval: fluxInterval, + UseLogsNewSchema: serverOptions.UseLogsNewSchema, }) if err != nil { return nil, err @@ -713,7 +716,8 @@ func makeRulesManager( db *sqlx.DB, ch interfaces.Reader, disableRules bool, - fm interfaces.FeatureLookup) (*rules.Manager, error) { + fm interfaces.FeatureLookup, + useLogsNewSchema bool) (*rules.Manager, error) { // create engine pqle, err := pqle.FromReader(ch) @@ -730,16 +734,17 @@ func makeRulesManager( // create manager opts managerOpts := &rules.ManagerOptions{ - NotifierOpts: notifierOpts, - PqlEngine: pqle, - RepoURL: ruleRepoURL, - DBConn: db, - Context: context.Background(), - Logger: nil, - DisableRules: disableRules, - FeatureFlags: fm, - Reader: ch, - EvalDelay: constants.GetEvalDelay(), + NotifierOpts: notifierOpts, + PqlEngine: pqle, + RepoURL: ruleRepoURL, + DBConn: db, + Context: context.Background(), + Logger: nil, + DisableRules: disableRules, + FeatureFlags: fm, + Reader: ch, + EvalDelay: constants.GetEvalDelay(), + UseLogsNewSchema: useLogsNewSchema, } // create Manager diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 3063e07b12..d1b191f248 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -33,6 +33,7 @@ func main() { // disables rule execution but allows change to the rule definition var disableRules bool + var useLogsNewSchema bool // the url used to build link in the alert messages in slack and other systems var ruleRepoURL, cacheConfigPath, fluxInterval string var cluster string @@ -43,6 +44,7 @@ func main() { var maxOpenConns int var dialTimeout time.Duration + flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") @@ -79,6 +81,7 @@ func main() { CacheConfigPath: cacheConfigPath, FluxInterval: fluxInterval, Cluster: cluster, + UseLogsNewSchema: useLogsNewSchema, } // Read the jwt secret key diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 120d674a9a..eaabc4f27a 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -35,6 +35,8 @@ type PrepareTaskOptions struct { FF interfaces.FeatureLookup ManagerOpts *ManagerOptions NotifyFunc NotifyFunc + + UseLogsNewSchema bool } const taskNamesuffix = "webAppEditor" @@ -75,6 +77,8 @@ type ManagerOptions struct { EvalDelay time.Duration PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error) + + UseLogsNewSchema bool } // The Manager manages recording and alerting rules. @@ -96,6 +100,8 @@ type Manager struct { reader interfaces.Reader prepareTaskFunc func(opts PrepareTaskOptions) (Task, error) + + UseLogsNewSchema bool } func defaultOptions(o *ManagerOptions) *ManagerOptions { @@ -130,6 +136,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { opts.Rule, opts.FF, opts.Reader, + opts.UseLogsNewSchema, WithEvalDelay(opts.ManagerOpts.EvalDelay), ) @@ -333,6 +340,8 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error { FF: m.featureFlags, ManagerOpts: m.opts, NotifyFunc: m.prepareNotifyFunc(), + + UseLogsNewSchema: m.opts.UseLogsNewSchema, }) if err != nil { @@ -452,6 +461,8 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error { FF: m.featureFlags, ManagerOpts: m.opts, NotifyFunc: m.prepareNotifyFunc(), + + UseLogsNewSchema: m.opts.UseLogsNewSchema, }) for _, r := range newTask.Rules() { @@ -794,6 +805,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m parsedRule, m.featureFlags, m.reader, + m.opts.UseLogsNewSchema, WithSendAlways(), WithSendUnmatched(), ) diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index d35798035e..964774500e 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -60,6 +60,7 @@ func NewThresholdRule( p *PostableRule, featureFlags interfaces.FeatureLookup, reader interfaces.Reader, + useLogsNewSchema bool, opts ...RuleOption, ) (*ThresholdRule, error) { @@ -77,17 +78,19 @@ func NewThresholdRule( } querierOption := querier.QuerierOptions{ - Reader: reader, - Cache: nil, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FeatureLookup: featureFlags, + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, + UseLogsNewSchema: useLogsNewSchema, } querierOptsV2 := querierV2.QuerierOptions{ - Reader: reader, - Cache: nil, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FeatureLookup: featureFlags, + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, + UseLogsNewSchema: useLogsNewSchema, } t.querier = querier.NewQuerier(querierOption) diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 734347793d..ab37ad6af1 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -685,7 +685,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -774,7 +774,7 @@ func TestPrepareLinksToLogs(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -816,7 +816,7 @@ func TestPrepareLinksToTraces(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -892,7 +892,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -945,7 +945,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, fm, nil) // no eval delay + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true) // no eval delay if err != nil { assert.NoError(t, err) } @@ -994,7 +994,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -1135,9 +1135,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) - rule, err := NewThresholdRule("69", &postableRule, fm, reader) + rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule.temporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1234,9 +1234,9 @@ func TestThresholdRuleNoData(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) - rule, err := NewThresholdRule("69", &postableRule, fm, reader) + rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule.temporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index 65140e5fc8..d060433dba 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -45,6 +45,7 @@ func NewMockClickhouseReader( "", featureFlags, "", + true, ) return reader, mockDB