Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use new schema flag #5930

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type APIHandlerOptions struct {
Gateway *httputil.ReverseProxy
// Querier Influx Interval
FluxInterval time.Duration

UseLogsNewSchema bool
}

type APIHandler struct {
Expand All @@ -63,6 +65,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
LogsParsingPipelineController: opts.LogsParsingPipelineController,
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
})

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion ee/query-service/app/db/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ func NewDataConnector(
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster)
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema)
return &ClickhouseReader{
conn: ch.GetConn(),
appdb: localDB,
Expand Down
13 changes: 10 additions & 3 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ServerOptions struct {
FluxInterval string
Cluster string
GatewayUrl string
UseLogsNewSchema bool
}

// Server runs HTTP api service
Expand Down Expand Up @@ -154,6 +155,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
)
go qb.Start(readerReady)
reader = qb
Expand All @@ -176,7 +178,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
localDB,
reader,
serverOptions.DisableRules,
lm)
lm,
serverOptions.UseLogsNewSchema,
)

if err != nil {
return nil, err
Expand Down Expand Up @@ -265,6 +269,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
Cache: c,
FluxInterval: fluxInterval,
Gateway: gatewayProxy,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
}

apiHandler, err := api.NewAPIHandler(apiOpts)
Expand Down Expand Up @@ -728,7 +733,8 @@ func makeRulesManager(
db *sqlx.DB,
ch baseint.Reader,
disableRules bool,
fm baseint.FeatureLookup) (*baserules.Manager, error) {
fm baseint.FeatureLookup,
useLogsNewSchema bool) (*baserules.Manager, error) {

// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
Expand Down Expand Up @@ -756,7 +762,8 @@ func makeRulesManager(
Reader: ch,
EvalDelay: baseconst.GetEvalDelay(),

PrepareTaskFunc: rules.PrepareTaskFunc,
PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema,
}

// create Manager
Expand Down
3 changes: 3 additions & 0 deletions ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func main() {
var ruleRepoURL string
var cluster string

var useLogsNewSchema bool
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool
var preferSpanMetrics bool
Expand All @@ -96,6 +97,7 @@ func main() {
var dialTimeout time.Duration
var gatewayUrl string

flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
Expand Down Expand Up @@ -134,6 +136,7 @@ func main() {
FluxInterval: fluxInterval,
Cluster: cluster,
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
}

// Read the jwt secret key
Expand Down
1 change: 1 addition & 0 deletions ee/query-service/rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Rule,
opts.FF,
opts.Reader,
opts.UseLogsNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
)

Expand Down
7 changes: 6 additions & 1 deletion pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ type ClickHouseReader struct {

liveTailRefreshSeconds int
cluster string

useLogsNewSchema bool
}

// NewTraceReader returns a TraceReader for the database
Expand All @@ -143,6 +145,7 @@ func NewReader(
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
) *ClickHouseReader {

datasource := os.Getenv("ClickHouseUrl")
Expand All @@ -153,7 +156,7 @@ func NewReader(
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}

return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema)
}

func NewReaderFromClickhouseConnection(
Expand All @@ -163,6 +166,7 @@ func NewReaderFromClickhouseConnection(
configFile string,
featureFlag interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
) *ClickHouseReader {
alertManager, err := am.New("")
if err != nil {
Expand Down Expand Up @@ -219,6 +223,7 @@ func NewReaderFromClickhouseConnection(
featureFlags: featureFlag,
cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
useLogsNewSchema: useLogsNewSchema,
}
}

Expand Down
28 changes: 18 additions & 10 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type APIHandler struct {

// Websocket connection upgrader
Upgrader *websocket.Upgrader

UseLogsNewSchema bool
}

type APIHandlerOpts struct {
Expand Down Expand Up @@ -140,6 +142,9 @@ type APIHandlerOpts struct {

// Querier Influx Interval
FluxInterval time.Duration

// Use new schema
UseLogsNewSchema bool
}

// NewAPIHandler returns an APIHandler
Expand All @@ -151,19 +156,21 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
}

querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
}

querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
}

querier := querier.NewQuerier(querierOpts)
Expand All @@ -185,6 +192,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
querierV2: querierv2,
UseLogsNewSchema: opts.UseLogsNewSchema,
}

builderOpts := queryBuilder.QueryBuilderOptions{
Expand Down
9 changes: 6 additions & 3 deletions pkg/query-service/app/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type querier struct {
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error

UseLogsNewSchema bool
}

type QuerierOptions struct {
Expand All @@ -64,9 +66,10 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup

// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
}

func NewQuerier(opts QuerierOptions) interfaces.Querier {
Expand Down
9 changes: 6 additions & 3 deletions pkg/query-service/app/querier/v2/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type querier struct {
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error

UseLogsNewSchema bool
}

type QuerierOptions struct {
Expand All @@ -64,9 +66,10 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup

// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
}

func NewQuerier(opts QuerierOptions) interfaces.Querier {
Expand Down
29 changes: 17 additions & 12 deletions pkg/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type ServerOptions struct {
CacheConfigPath string
FluxInterval string
Cluster string
UseLogsNewSchema bool
}

// Server runs HTTP, Mux and a grpc server
Expand Down Expand Up @@ -128,6 +129,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
)
go clickhouseReader.Start(readerReady)
reader = clickhouseReader
Expand All @@ -144,7 +146,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}

<-readerReady
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm)
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,6 +199,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -713,7 +716,8 @@ func makeRulesManager(
db *sqlx.DB,
ch interfaces.Reader,
disableRules bool,
fm interfaces.FeatureLookup) (*rules.Manager, error) {
fm interfaces.FeatureLookup,
useLogsNewSchema bool) (*rules.Manager, error) {

// create engine
pqle, err := pqle.FromReader(ch)
Expand All @@ -730,16 +734,17 @@ func makeRulesManager(

// create manager opts
managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: nil,
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
EvalDelay: constants.GetEvalDelay(),
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: nil,
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
}

// create Manager
Expand Down
3 changes: 3 additions & 0 deletions pkg/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func main() {
// disables rule execution but allows change to the rule definition
var disableRules bool

var useLogsNewSchema bool
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL, cacheConfigPath, fluxInterval string
var cluster string
Expand All @@ -43,6 +44,7 @@ func main() {
var maxOpenConns int
var dialTimeout time.Duration

flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
Expand Down Expand Up @@ -79,6 +81,7 @@ func main() {
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
Cluster: cluster,
UseLogsNewSchema: useLogsNewSchema,
}

// Read the jwt secret key
Expand Down
Loading
Loading