Skip to content

Commit

Permalink
Make federate sharding possible in metrics-collector by using workers
Browse files Browse the repository at this point in the history
Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
  • Loading branch information
saswatamcode committed Sep 15, 2024
1 parent dc5f925 commit 8ee5c93
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 92 deletions.
202 changes: 123 additions & 79 deletions collectors/metrics/cmd/metrics-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
clientmodel "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/uuid"

"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/collectrule"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/forwarder"
Expand Down Expand Up @@ -57,7 +57,7 @@ func main() {
&opt.WorkerNum,
"worker-number",
opt.WorkerNum,
"The number of client runs in the simulate environment.")
"The number of workers that work parallely to federate and remote write metrics.")

Check failure on line 60 in collectors/metrics/cmd/metrics-collector/main.go

View workflow job for this annotation

GitHub Actions / Formatters + Linters (Static Analysis) for Go

`parallely` is a misspelling of `parallelly` (misspell)
cmd.Flags().StringVar(
&opt.Listen,
"listen",
Expand Down Expand Up @@ -301,16 +301,16 @@ func (o *Options) Run() error {
// Some packages still use default Register. Replace to have those metrics.
prometheus.DefaultRegisterer = metricsReg

err, cfg := initConfig(o)
if err != nil {
return err
if len(o.RulesFile) > 0 {
data, err := os.ReadFile(o.RulesFile)
if err != nil {
return fmt.Errorf("unable to read match-file: %w", err)
}
o.Rules = append(o.Rules, strings.Split(string(data), "\n")...)
}

metrics := forwarder.NewWorkerMetrics(metricsReg)
cfg.Metrics = metrics
worker, err := forwarder.New(*cfg)
if err != nil {
return fmt.Errorf("failed to configure metrics collector: %w", err)
if len(o.Rules) < int(o.WorkerNum) {
return errors.New("number of workers cannot be greater than number of rules")
}

logger.Log(
Expand All @@ -320,11 +320,28 @@ func (o *Options) Run() error {
"to", o.ToUpload,
"listen", o.Listen)

workers, err := o.getWorkers(metricsReg)
if err != nil {
return err
}

if len(workers) == 0 || workers == nil {
return errors.New("no workers found")
}

ctx, cancel := context.WithCancel(context.Background())
{
// Execute the worker's `Run` func.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
worker.Run(ctx)
for _, worker := range workers {
go func(w *forwarder.Worker) {
select {
case <-ctx.Done():
return
default:
w.Run(ctx)
}
}(worker)
}
return nil
}, func(error) {
cancel()
Expand All @@ -340,8 +357,9 @@ func (o *Options) Run() error {
for {
select {
case <-hup:
if err := worker.Reconfigure(*cfg); err != nil {
logger.Log(o.Logger, logger.Error, "msg", "failed to reload config", "err", err)
workers, err = o.reconfigureWorkers(metricsReg, workers)
if err != nil {
logger.Log(o.Logger, logger.Error, "msg", "failed to reconfigure workers", "err", err)
return err
}
case <-cancel:
Expand All @@ -359,9 +377,14 @@ func (o *Options) Run() error {
collectorhttp.HealthRoutes(handlers)
collectorhttp.MetricRoutes(handlers, metricsReg)
collectorhttp.ReloadRoutes(handlers, func() error {
return worker.Reconfigure(*cfg)
workers, err = o.reconfigureWorkers(metricsReg, workers)
if err != nil {
logger.Log(o.Logger, logger.Error, "msg", "failed to reconfigure workers", "err", err)
return err
}
return nil
})
handlers.Handle("/federate", serveLastMetrics(o.Logger, worker))
handlers.Handle("/federate", serveLastMetrics(o.Logger, workers))
s := http.Server{
Addr: o.Listen,
Handler: handlers,
Expand All @@ -387,12 +410,11 @@ func (o *Options) Run() error {
}
}

err = runMultiWorkers(o, cfg)
if err != nil {
return err
}

if len(o.CollectRules) != 0 {
cfg, err := initConfig(o, []string{})
if err != nil {
return fmt.Errorf("failed to configure collect rule evaluator: %w", err)
}
evaluator, err := collectrule.New(*cfg)
if err != nil {
return fmt.Errorf("failed to configure collect rule evaluator: %w", err)
Expand All @@ -409,68 +431,86 @@ func (o *Options) Run() error {
return g.Run()
}

func runMultiWorkers(o *Options, cfg *forwarder.Config) error {
for i := 1; i < int(o.WorkerNum); i++ {
opt := &Options{
From: o.From,
FromQuery: o.FromQuery,
ToUpload: o.ToUpload,
FromCAFile: o.FromCAFile,
FromTokenFile: o.FromTokenFile,
ToUploadCA: o.ToUploadCA,
ToUploadCert: o.ToUploadCert,
ToUploadKey: o.ToUploadKey,
Rules: o.Rules,
RenameFlag: o.RenameFlag,
RecordingRules: o.RecordingRules,
Interval: o.Interval,
Labels: map[string]string{},
SimulatedTimeseriesFile: o.SimulatedTimeseriesFile,
Logger: o.Logger,
}
for _, flag := range o.LabelFlag {
values := strings.SplitN(flag, "=", 2)
if len(values) != 2 {
return fmt.Errorf("--label must be of the form key=value: %s", flag)
}
if values[0] == "cluster" {
values[1] += "-" + fmt.Sprint(i)
}
if values[0] == "clusterID" {
values[1] = string(uuid.NewUUID())
}
opt.Labels[values[0]] = values[1]
func (o *Options) getWorkers(reg *prometheus.Registry) ([]*forwarder.Worker, error) {
// Calculate the number of rules per worker
rulesPerWorker := len(o.Rules) / int(o.WorkerNum)
if rulesPerWorker == 0 {
rulesPerWorker = 1
}

wm := forwarder.NewWorkerMetrics(reg)

workers := make([]*forwarder.Worker, 0, o.WorkerNum)

for i := 0; i < int(o.WorkerNum); i++ {
// Calculate the start and end indices for this worker's rules
startIndex := i * rulesPerWorker
endIndex := (i + 1) * rulesPerWorker
if i == int(o.WorkerNum)-1 {
endIndex = len(o.Rules)
}
err, forwardCfg := initConfig(opt)

// Create a slice of rules for this worker
workerRules := o.Rules[startIndex:endIndex]

forwardCfg, err := initConfig(o, workerRules)
if err != nil {
return err
return nil, err
}

forwardCfg.Metrics = cfg.Metrics
forwardCfg.Metrics = wm
forwardWorker, err := forwarder.New(*forwardCfg)
if err != nil {
return fmt.Errorf("failed to configure metrics collector: %w", err)
return nil, fmt.Errorf("failed to configure metrics collector: %w", err)
}
workers = append(workers, forwardWorker)
}
return workers, nil
}

ctx, cancel := context.WithCancel(context.Background())
go func() {
forwardWorker.Run(ctx)
cancel()
}()
func (o *Options) reconfigureWorkers(reg *prometheus.Registry, workers []*forwarder.Worker) ([]*forwarder.Worker, error) {
// Calculate the number of rules per worker
rulesPerWorker := len(o.Rules) / len(workers)
if rulesPerWorker == 0 {
rulesPerWorker = 1
}

wm := forwarder.NewWorkerMetrics(reg)

for i, worker := range workers {
// Calculate the start and end indices for this worker's rules
startIndex := i * rulesPerWorker
endIndex := (i + 1) * rulesPerWorker
if i == int(o.WorkerNum)-1 {
endIndex = len(o.Rules)
}

// Create a slice of rules for this worker
workerRules := o.Rules[startIndex:endIndex]

forwardCfg, err := initConfig(o, workerRules)
if err != nil {
return nil, err
}

forwardCfg.Metrics = wm
err = worker.Reconfigure(*forwardCfg)
if err != nil {
return nil, err
}
}
return nil
return workers, nil
}

func initConfig(o *Options) (error, *forwarder.Config) {
func initConfig(o *Options, workerRules []string) (*forwarder.Config, error) {
if len(o.From) == 0 {
return errors.New("you must specify a Prometheus server to federate from (e.g. http://localhost:9090)"), nil
return nil, errors.New("you must specify a Prometheus server to federate from (e.g. http://localhost:9090)")
}

for _, flag := range o.LabelFlag {
values := strings.SplitN(flag, "=", 2)
if len(values) != 2 {
return fmt.Errorf("--label must be of the form key=value: %s", flag), nil
return nil, fmt.Errorf("--label must be of the form key=value: %s", flag)
}
if o.Labels == nil {
o.Labels = make(map[string]string)
Expand All @@ -484,7 +524,7 @@ func initConfig(o *Options) (error, *forwarder.Config) {
}
values := strings.SplitN(flag, "=", 2)
if len(values) != 2 {
return fmt.Errorf("--rename must be of the form OLD_NAME=NEW_NAME: %s", flag), nil
return nil, fmt.Errorf("--rename must be of the form OLD_NAME=NEW_NAME: %s", flag)
}
if o.Renames == nil {
o.Renames = make(map[string]string)
Expand All @@ -494,7 +534,7 @@ func initConfig(o *Options) (error, *forwarder.Config) {

from, err := url.Parse(o.From)
if err != nil {
return fmt.Errorf("--from is not a valid URL: %w", err), nil
return nil, fmt.Errorf("--from is not a valid URL: %w", err)
}
from.Path = strings.TrimRight(from.Path, "/")
if len(from.Path) == 0 {
Expand All @@ -503,7 +543,7 @@ func initConfig(o *Options) (error, *forwarder.Config) {

fromQuery, err := url.Parse(o.FromQuery)
if err != nil {
return fmt.Errorf("--from-query is not a valid URL: %w", err), nil
return nil, fmt.Errorf("--from-query is not a valid URL: %w", err)
}
fromQuery.Path = strings.TrimRight(fromQuery.Path, "/")
if len(fromQuery.Path) == 0 {
Expand All @@ -514,12 +554,12 @@ func initConfig(o *Options) (error, *forwarder.Config) {
if len(o.ToUpload) > 0 {
toUpload, err = url.Parse(o.ToUpload)
if err != nil {
return fmt.Errorf("--to-upload is not a valid URL: %w", err), nil
return nil, fmt.Errorf("--to-upload is not a valid URL: %w", err)
}
}

if toUpload == nil {
return errors.New("--to-upload must be specified"), nil
return nil, errors.New("--to-upload must be specified")
}

var transformer metricfamily.MultiTransformer
Expand Down Expand Up @@ -556,19 +596,19 @@ func initConfig(o *Options) (error, *forwarder.Config) {

isHypershift, err := metricfamily.CheckCRDExist(o.Logger)
if err != nil {
return err, nil
return nil, err
}
if isHypershift {
hyperTransformer, err := metricfamily.NewHypershiftTransformer(o.Logger, nil, o.Labels)
if err != nil {
return err, nil
return nil, err
}
transformer.WithFunc(func() metricfamily.Transformer {
return hyperTransformer
})
}

return nil, &forwarder.Config{
return &forwarder.Config{
From: from,
FromQuery: fromQuery,
ToUpload: toUpload,
Expand All @@ -586,25 +626,29 @@ func initConfig(o *Options) (error, *forwarder.Config) {
Interval: o.Interval,
EvaluateInterval: o.EvaluateInterval,
LimitBytes: o.LimitBytes,
Rules: o.Rules,
RulesFile: o.RulesFile,
Rules: workerRules,
RecordingRules: o.RecordingRules,
CollectRules: o.CollectRules,
Transformer: transformer,

Logger: o.Logger,
SimulatedTimeseriesFile: o.SimulatedTimeseriesFile,
}
}, nil
}

// serveLastMetrics retrieves the last set of metrics served.
func serveLastMetrics(l log.Logger, worker *forwarder.Worker) http.Handler {
func serveLastMetrics(l log.Logger, worker []*forwarder.Worker) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.Method != "GET" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
families := worker.LastMetrics()

families := []*clientmodel.MetricFamily{}
for _, worker := range worker {
families = append(families, worker.LastMetrics()...)
}

protoTextFormat := expfmt.NewFormat(expfmt.TypeProtoText)
w.Header().Set("Content-Type", string(protoTextFormat))
encoder := expfmt.NewEncoder(w, protoTextFormat)
Expand Down
Loading

0 comments on commit 8ee5c93

Please sign in to comment.