From 8ee5c93eb2477935d2bb639fc049a6d4d613cf7a Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Sun, 15 Sep 2024 22:51:22 +0100 Subject: [PATCH] Make federate sharding possible in metrics-collector by using workers Signed-off-by: Saswata Mukherjee --- .../metrics/cmd/metrics-collector/main.go | 202 +++++++++++------- .../cmd/metrics-collector/main_test.go | 15 +- collectors/metrics/pkg/forwarder/forwarder.go | 8 - 3 files changed, 133 insertions(+), 92 deletions(-) diff --git a/collectors/metrics/cmd/metrics-collector/main.go b/collectors/metrics/cmd/metrics-collector/main.go index 384dac2fef..2077388f1c 100644 --- a/collectors/metrics/cmd/metrics-collector/main.go +++ b/collectors/metrics/cmd/metrics-collector/main.go @@ -23,9 +23,9 @@ import ( "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" + clientmodel "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/util/uuid" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/collectrule" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/forwarder" @@ -57,7 +57,7 @@ func main() { &opt.WorkerNum, "worker-number", opt.WorkerNum, - "The number of client runs in the simulate environment.") + "The number of workers that work parallely to federate and remote write metrics.") cmd.Flags().StringVar( &opt.Listen, "listen", @@ -301,16 +301,16 @@ func (o *Options) Run() error { // Some packages still use default Register. Replace to have those metrics. prometheus.DefaultRegisterer = metricsReg - err, cfg := initConfig(o) - if err != nil { - return err + if len(o.RulesFile) > 0 { + data, err := os.ReadFile(o.RulesFile) + if err != nil { + return fmt.Errorf("unable to read match-file: %w", err) + } + o.Rules = append(o.Rules, strings.Split(string(data), "\n")...) } - metrics := forwarder.NewWorkerMetrics(metricsReg) - cfg.Metrics = metrics - worker, err := forwarder.New(*cfg) - if err != nil { - return fmt.Errorf("failed to configure metrics collector: %w", err) + if len(o.Rules) < int(o.WorkerNum) { + return errors.New("number of workers cannot be greater than number of rules") } logger.Log( @@ -320,11 +320,28 @@ func (o *Options) Run() error { "to", o.ToUpload, "listen", o.Listen) + workers, err := o.getWorkers(metricsReg) + if err != nil { + return err + } + + if len(workers) == 0 || workers == nil { + return errors.New("no workers found") + } + + ctx, cancel := context.WithCancel(context.Background()) { - // Execute the worker's `Run` func. - ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - worker.Run(ctx) + for _, worker := range workers { + go func(w *forwarder.Worker) { + select { + case <-ctx.Done(): + return + default: + w.Run(ctx) + } + }(worker) + } return nil }, func(error) { cancel() @@ -340,8 +357,9 @@ func (o *Options) Run() error { for { select { case <-hup: - if err := worker.Reconfigure(*cfg); err != nil { - logger.Log(o.Logger, logger.Error, "msg", "failed to reload config", "err", err) + workers, err = o.reconfigureWorkers(metricsReg, workers) + if err != nil { + logger.Log(o.Logger, logger.Error, "msg", "failed to reconfigure workers", "err", err) return err } case <-cancel: @@ -359,9 +377,14 @@ func (o *Options) Run() error { collectorhttp.HealthRoutes(handlers) collectorhttp.MetricRoutes(handlers, metricsReg) collectorhttp.ReloadRoutes(handlers, func() error { - return worker.Reconfigure(*cfg) + workers, err = o.reconfigureWorkers(metricsReg, workers) + if err != nil { + logger.Log(o.Logger, logger.Error, "msg", "failed to reconfigure workers", "err", err) + return err + } + return nil }) - handlers.Handle("/federate", serveLastMetrics(o.Logger, worker)) + handlers.Handle("/federate", serveLastMetrics(o.Logger, workers)) s := http.Server{ Addr: o.Listen, Handler: handlers, @@ -387,12 +410,11 @@ func (o *Options) Run() error { } } - err = runMultiWorkers(o, cfg) - if err != nil { - return err - } - if len(o.CollectRules) != 0 { + cfg, err := initConfig(o, []string{}) + if err != nil { + return fmt.Errorf("failed to configure collect rule evaluator: %w", err) + } evaluator, err := collectrule.New(*cfg) if err != nil { return fmt.Errorf("failed to configure collect rule evaluator: %w", err) @@ -409,68 +431,86 @@ func (o *Options) Run() error { return g.Run() } -func runMultiWorkers(o *Options, cfg *forwarder.Config) error { - for i := 1; i < int(o.WorkerNum); i++ { - opt := &Options{ - From: o.From, - FromQuery: o.FromQuery, - ToUpload: o.ToUpload, - FromCAFile: o.FromCAFile, - FromTokenFile: o.FromTokenFile, - ToUploadCA: o.ToUploadCA, - ToUploadCert: o.ToUploadCert, - ToUploadKey: o.ToUploadKey, - Rules: o.Rules, - RenameFlag: o.RenameFlag, - RecordingRules: o.RecordingRules, - Interval: o.Interval, - Labels: map[string]string{}, - SimulatedTimeseriesFile: o.SimulatedTimeseriesFile, - Logger: o.Logger, - } - for _, flag := range o.LabelFlag { - values := strings.SplitN(flag, "=", 2) - if len(values) != 2 { - return fmt.Errorf("--label must be of the form key=value: %s", flag) - } - if values[0] == "cluster" { - values[1] += "-" + fmt.Sprint(i) - } - if values[0] == "clusterID" { - values[1] = string(uuid.NewUUID()) - } - opt.Labels[values[0]] = values[1] +func (o *Options) getWorkers(reg *prometheus.Registry) ([]*forwarder.Worker, error) { + // Calculate the number of rules per worker + rulesPerWorker := len(o.Rules) / int(o.WorkerNum) + if rulesPerWorker == 0 { + rulesPerWorker = 1 + } + + wm := forwarder.NewWorkerMetrics(reg) + + workers := make([]*forwarder.Worker, 0, o.WorkerNum) + + for i := 0; i < int(o.WorkerNum); i++ { + // Calculate the start and end indices for this worker's rules + startIndex := i * rulesPerWorker + endIndex := (i + 1) * rulesPerWorker + if i == int(o.WorkerNum)-1 { + endIndex = len(o.Rules) } - err, forwardCfg := initConfig(opt) + + // Create a slice of rules for this worker + workerRules := o.Rules[startIndex:endIndex] + + forwardCfg, err := initConfig(o, workerRules) if err != nil { - return err + return nil, err } - forwardCfg.Metrics = cfg.Metrics + forwardCfg.Metrics = wm forwardWorker, err := forwarder.New(*forwardCfg) if err != nil { - return fmt.Errorf("failed to configure metrics collector: %w", err) + return nil, fmt.Errorf("failed to configure metrics collector: %w", err) } + workers = append(workers, forwardWorker) + } + return workers, nil +} - ctx, cancel := context.WithCancel(context.Background()) - go func() { - forwardWorker.Run(ctx) - cancel() - }() +func (o *Options) reconfigureWorkers(reg *prometheus.Registry, workers []*forwarder.Worker) ([]*forwarder.Worker, error) { + // Calculate the number of rules per worker + rulesPerWorker := len(o.Rules) / len(workers) + if rulesPerWorker == 0 { + rulesPerWorker = 1 + } + + wm := forwarder.NewWorkerMetrics(reg) + for i, worker := range workers { + // Calculate the start and end indices for this worker's rules + startIndex := i * rulesPerWorker + endIndex := (i + 1) * rulesPerWorker + if i == int(o.WorkerNum)-1 { + endIndex = len(o.Rules) + } + + // Create a slice of rules for this worker + workerRules := o.Rules[startIndex:endIndex] + + forwardCfg, err := initConfig(o, workerRules) + if err != nil { + return nil, err + } + + forwardCfg.Metrics = wm + err = worker.Reconfigure(*forwardCfg) + if err != nil { + return nil, err + } } - return nil + return workers, nil } -func initConfig(o *Options) (error, *forwarder.Config) { +func initConfig(o *Options, workerRules []string) (*forwarder.Config, error) { if len(o.From) == 0 { - return errors.New("you must specify a Prometheus server to federate from (e.g. http://localhost:9090)"), nil + return nil, errors.New("you must specify a Prometheus server to federate from (e.g. http://localhost:9090)") } for _, flag := range o.LabelFlag { values := strings.SplitN(flag, "=", 2) if len(values) != 2 { - return fmt.Errorf("--label must be of the form key=value: %s", flag), nil + return nil, fmt.Errorf("--label must be of the form key=value: %s", flag) } if o.Labels == nil { o.Labels = make(map[string]string) @@ -484,7 +524,7 @@ func initConfig(o *Options) (error, *forwarder.Config) { } values := strings.SplitN(flag, "=", 2) if len(values) != 2 { - return fmt.Errorf("--rename must be of the form OLD_NAME=NEW_NAME: %s", flag), nil + return nil, fmt.Errorf("--rename must be of the form OLD_NAME=NEW_NAME: %s", flag) } if o.Renames == nil { o.Renames = make(map[string]string) @@ -494,7 +534,7 @@ func initConfig(o *Options) (error, *forwarder.Config) { from, err := url.Parse(o.From) if err != nil { - return fmt.Errorf("--from is not a valid URL: %w", err), nil + return nil, fmt.Errorf("--from is not a valid URL: %w", err) } from.Path = strings.TrimRight(from.Path, "/") if len(from.Path) == 0 { @@ -503,7 +543,7 @@ func initConfig(o *Options) (error, *forwarder.Config) { fromQuery, err := url.Parse(o.FromQuery) if err != nil { - return fmt.Errorf("--from-query is not a valid URL: %w", err), nil + return nil, fmt.Errorf("--from-query is not a valid URL: %w", err) } fromQuery.Path = strings.TrimRight(fromQuery.Path, "/") if len(fromQuery.Path) == 0 { @@ -514,12 +554,12 @@ func initConfig(o *Options) (error, *forwarder.Config) { if len(o.ToUpload) > 0 { toUpload, err = url.Parse(o.ToUpload) if err != nil { - return fmt.Errorf("--to-upload is not a valid URL: %w", err), nil + return nil, fmt.Errorf("--to-upload is not a valid URL: %w", err) } } if toUpload == nil { - return errors.New("--to-upload must be specified"), nil + return nil, errors.New("--to-upload must be specified") } var transformer metricfamily.MultiTransformer @@ -556,19 +596,19 @@ func initConfig(o *Options) (error, *forwarder.Config) { isHypershift, err := metricfamily.CheckCRDExist(o.Logger) if err != nil { - return err, nil + return nil, err } if isHypershift { hyperTransformer, err := metricfamily.NewHypershiftTransformer(o.Logger, nil, o.Labels) if err != nil { - return err, nil + return nil, err } transformer.WithFunc(func() metricfamily.Transformer { return hyperTransformer }) } - return nil, &forwarder.Config{ + return &forwarder.Config{ From: from, FromQuery: fromQuery, ToUpload: toUpload, @@ -586,25 +626,29 @@ func initConfig(o *Options) (error, *forwarder.Config) { Interval: o.Interval, EvaluateInterval: o.EvaluateInterval, LimitBytes: o.LimitBytes, - Rules: o.Rules, - RulesFile: o.RulesFile, + Rules: workerRules, RecordingRules: o.RecordingRules, CollectRules: o.CollectRules, Transformer: transformer, Logger: o.Logger, SimulatedTimeseriesFile: o.SimulatedTimeseriesFile, - } + }, nil } // serveLastMetrics retrieves the last set of metrics served. -func serveLastMetrics(l log.Logger, worker *forwarder.Worker) http.Handler { +func serveLastMetrics(l log.Logger, worker []*forwarder.Worker) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.Method != "GET" { w.WriteHeader(http.StatusMethodNotAllowed) return } - families := worker.LastMetrics() + + families := []*clientmodel.MetricFamily{} + for _, worker := range worker { + families = append(families, worker.LastMetrics()...) + } + protoTextFormat := expfmt.NewFormat(expfmt.TypeProtoText) w.Header().Set("Content-Type", string(protoTextFormat)) encoder := expfmt.NewEncoder(w, protoTextFormat) diff --git a/collectors/metrics/cmd/metrics-collector/main_test.go b/collectors/metrics/cmd/metrics-collector/main_test.go index 3b28bdfa60..86bddd1863 100644 --- a/collectors/metrics/cmd/metrics-collector/main_test.go +++ b/collectors/metrics/cmd/metrics-collector/main_test.go @@ -5,6 +5,7 @@ package main import ( + "context" stdlog "log" "os" "testing" @@ -13,7 +14,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/forwarder" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/logger" ) @@ -22,13 +22,12 @@ func init() { } func TestMultiWorkers(t *testing.T) { - opt := &Options{ Listen: "localhost:9002", LimitBytes: 200 * 1024, Rules: []string{`{__name__="instance:node_vmstat_pgmajfault:rate1m"}`}, Interval: 4*time.Minute + 30*time.Second, - WorkerNum: 2, + WorkerNum: 1, SimulatedTimeseriesFile: "../../testdata/timeseries.txt", From: "https://prometheus-k8s.openshift-monitoring.svc:9091", ToUpload: "https://prometheus-k8s.openshift-monitoring.svc:9091", @@ -47,10 +46,16 @@ func TestMultiWorkers(t *testing.T) { stdlog.SetOutput(log.NewStdlibAdapter(l)) opt.Logger = l - err := runMultiWorkers(opt, &forwarder.Config{Metrics: forwarder.NewWorkerMetrics(prometheus.NewRegistry())}) + workers, err := opt.getWorkers(prometheus.NewRegistry()) if err != nil { t.Fatal(err) } - time.Sleep(1 * time.Second) + for _, worker := range workers { + go func() { + worker.Run(context.Background()) + }() + } + + time.Sleep(6 * time.Second) } diff --git a/collectors/metrics/pkg/forwarder/forwarder.go b/collectors/metrics/pkg/forwarder/forwarder.go index 679ac683ac..6d6c8b8425 100644 --- a/collectors/metrics/pkg/forwarder/forwarder.go +++ b/collectors/metrics/pkg/forwarder/forwarder.go @@ -62,7 +62,6 @@ type Config struct { EvaluateInterval time.Duration LimitBytes int64 Rules []string - RulesFile string RecordingRules []string RecordingRulesFile string CollectRules []string @@ -269,13 +268,6 @@ func New(cfg Config) (*Worker, error) { // Configure the matching rules. rules := cfg.Rules - if len(cfg.RulesFile) > 0 { - data, err := os.ReadFile(cfg.RulesFile) - if err != nil { - return nil, fmt.Errorf("unable to read match-file: %w", err) - } - rules = append(rules, strings.Split(string(data), "\n")...) - } for i := 0; i < len(rules); { s := strings.TrimSpace(rules[i]) if len(s) == 0 {