Skip to content

Commit

Permalink
feat: use new schema flag
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Sep 11, 2024
1 parent d6b75d7 commit 52e0bf1
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 51 deletions.
3 changes: 3 additions & 0 deletions ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type APIHandlerOptions struct {
Gateway *httputil.ReverseProxy
// Querier Influx Interval
FluxInterval time.Duration

UseLogsNewSchema bool
}

type APIHandler struct {
Expand All @@ -63,6 +65,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
LogsParsingPipelineController: opts.LogsParsingPipelineController,
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
})

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion ee/query-service/app/db/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ func NewDataConnector(
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster)
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema)
return &ClickhouseReader{
conn: ch.GetConn(),
appdb: localDB,
Expand Down
13 changes: 10 additions & 3 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ServerOptions struct {
FluxInterval string
Cluster string
GatewayUrl string
UseLogsNewSchema bool
}

// Server runs HTTP api service
Expand Down Expand Up @@ -154,6 +155,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
)
go qb.Start(readerReady)
reader = qb
Expand All @@ -176,7 +178,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
localDB,
reader,
serverOptions.DisableRules,
lm)
lm,
serverOptions.UseLogsNewSchema,
)

if err != nil {
return nil, err
Expand Down Expand Up @@ -265,6 +269,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
Cache: c,
FluxInterval: fluxInterval,
Gateway: gatewayProxy,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
}

apiHandler, err := api.NewAPIHandler(apiOpts)
Expand Down Expand Up @@ -728,7 +733,8 @@ func makeRulesManager(
db *sqlx.DB,
ch baseint.Reader,
disableRules bool,
fm baseint.FeatureLookup) (*baserules.Manager, error) {
fm baseint.FeatureLookup,
useLogsNewSchema bool) (*baserules.Manager, error) {

// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
Expand Down Expand Up @@ -756,7 +762,8 @@ func makeRulesManager(
Reader: ch,
EvalDelay: baseconst.GetEvalDelay(),

PrepareTaskFunc: rules.PrepareTaskFunc,
PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema,
}

// create Manager
Expand Down
3 changes: 3 additions & 0 deletions ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func main() {
var ruleRepoURL string
var cluster string

var useLogsNewSchema bool
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool
var preferSpanMetrics bool
Expand All @@ -96,6 +97,7 @@ func main() {
var dialTimeout time.Duration
var gatewayUrl string

flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
Expand Down Expand Up @@ -134,6 +136,7 @@ func main() {
FluxInterval: fluxInterval,
Cluster: cluster,
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
}

// Read the jwt secret key
Expand Down
1 change: 1 addition & 0 deletions ee/query-service/rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Rule,
opts.FF,
opts.Reader,
opts.UseLogsNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
)

Expand Down
7 changes: 6 additions & 1 deletion pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ type ClickHouseReader struct {

liveTailRefreshSeconds int
cluster string

useLogsNewSchema bool
}

// NewTraceReader returns a TraceReader for the database
Expand All @@ -143,6 +145,7 @@ func NewReader(
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
) *ClickHouseReader {

datasource := os.Getenv("ClickHouseUrl")
Expand All @@ -153,7 +156,7 @@ func NewReader(
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}

return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema)
}

func NewReaderFromClickhouseConnection(
Expand All @@ -163,6 +166,7 @@ func NewReaderFromClickhouseConnection(
configFile string,
featureFlag interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
) *ClickHouseReader {
alertManager, err := am.New("")
if err != nil {
Expand Down Expand Up @@ -219,6 +223,7 @@ func NewReaderFromClickhouseConnection(
featureFlags: featureFlag,
cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
useLogsNewSchema: useLogsNewSchema,
}
}

Expand Down
28 changes: 18 additions & 10 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type APIHandler struct {

// Websocket connection upgrader
Upgrader *websocket.Upgrader

UseLogsNewSchema bool
}

type APIHandlerOpts struct {
Expand Down Expand Up @@ -140,6 +142,9 @@ type APIHandlerOpts struct {

// Querier Influx Interval
FluxInterval time.Duration

// Use new schema
UseLogsNewSchema bool
}

// NewAPIHandler returns an APIHandler
Expand All @@ -151,19 +156,21 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
}

querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
}

querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
}

querier := querier.NewQuerier(querierOpts)
Expand All @@ -185,6 +192,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
querierV2: querierv2,
UseLogsNewSchema: opts.UseLogsNewSchema,
}

builderOpts := queryBuilder.QueryBuilderOptions{
Expand Down
9 changes: 6 additions & 3 deletions pkg/query-service/app/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type querier struct {
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error

UseLogsNewSchema bool
}

type QuerierOptions struct {
Expand All @@ -64,9 +66,10 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup

// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
}

func NewQuerier(opts QuerierOptions) interfaces.Querier {
Expand Down
9 changes: 6 additions & 3 deletions pkg/query-service/app/querier/v2/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type querier struct {
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error

UseLogsNewSchema bool
}

type QuerierOptions struct {
Expand All @@ -64,9 +66,10 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup

// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
}

func NewQuerier(opts QuerierOptions) interfaces.Querier {
Expand Down
29 changes: 17 additions & 12 deletions pkg/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type ServerOptions struct {
CacheConfigPath string
FluxInterval string
Cluster string
UseLogsNewSchema bool
}

// Server runs HTTP, Mux and a grpc server
Expand Down Expand Up @@ -128,6 +129,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
)
go clickhouseReader.Start(readerReady)
reader = clickhouseReader
Expand All @@ -144,7 +146,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}

<-readerReady
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm)
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,6 +199,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -713,7 +716,8 @@ func makeRulesManager(
db *sqlx.DB,
ch interfaces.Reader,
disableRules bool,
fm interfaces.FeatureLookup) (*rules.Manager, error) {
fm interfaces.FeatureLookup,
useLogsNewSchema bool) (*rules.Manager, error) {

// create engine
pqle, err := pqle.FromReader(ch)
Expand All @@ -730,16 +734,17 @@ func makeRulesManager(

// create manager opts
managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: nil,
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
EvalDelay: constants.GetEvalDelay(),
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: nil,
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
}

// create Manager
Expand Down
3 changes: 3 additions & 0 deletions pkg/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func main() {
// disables rule execution but allows change to the rule definition
var disableRules bool

var useLogsNewSchema bool
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL, cacheConfigPath, fluxInterval string
var cluster string
Expand All @@ -43,6 +44,7 @@ func main() {
var maxOpenConns int
var dialTimeout time.Duration

flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
Expand Down Expand Up @@ -79,6 +81,7 @@ func main() {
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
Cluster: cluster,
UseLogsNewSchema: useLogsNewSchema,
}

// Read the jwt secret key
Expand Down
Loading

0 comments on commit 52e0bf1

Please sign in to comment.