Skip to content

Commit

Permalink
Add config and dependency injection to scheduler metrics (#2892)
Browse files Browse the repository at this point in the history
* Replace metrics singleton with an injection pattern.

* fix

* add configuration structures to metrics

* add configuration

* rename elements
  • Loading branch information
theAntiYeti authored Aug 18, 2023
1 parent 1f26977 commit 0415a30
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 22 deletions.
9 changes: 9 additions & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ internedStringsCacheSize: 100000
metrics:
port: 9000
refreshInterval: 30s
metrics:
scheduleCycleTimeHistogramSettings:
start: 1.0
factor: 1.1
count: 110
reconcileCycleTimeHistogramSettings:
start: 1.0
factor: 1.1
count: 110
pulsar:
URL: "pulsar://pulsar:6650"
jobsetEventsTopic: "events"
Expand Down
12 changes: 12 additions & 0 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,18 @@ type MetricsConfig struct {
Port uint16
RefreshInterval time.Duration
ExposeSchedulingMetrics bool
Metrics SchedulerMetricsConfig
}

type SchedulerMetricsConfig struct {
ScheduleCycleTimeHistogramSettings HistogramConfig
ReconcileCycleTimeHistogramSettings HistogramConfig
}

type HistogramConfig struct {
Start float64
Factor float64
Count int
}

type EventApiConfig struct {
Expand Down
7 changes: 4 additions & 3 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewScheduler(
executorTimeout time.Duration,
maxAttemptedRuns uint,
nodeIdLabel string,
schedulerMetrics *SchedulerMetrics,
) (*Scheduler, error) {
jobDb := jobdb.NewJobDb()
return &Scheduler{
Expand All @@ -110,7 +111,7 @@ func NewScheduler(
nodeIdLabel: nodeIdLabel,
jobsSerial: -1,
runsSerial: -1,
metrics: GetSchedulerMetrics(),
metrics: schedulerMetrics,
}, nil
}

Expand Down Expand Up @@ -175,10 +176,10 @@ func (s *Scheduler) Run(ctx context.Context) error {

if shouldSchedule && leaderToken.leader {
// Only the leader token does real scheduling rounds.
s.metrics.ReportScheduleCycleTime(float64(cycleTime.Milliseconds()))
s.metrics.ReportScheduleCycleTime(cycleTime)
log.Infof("scheduling cycle completed in %s", cycleTime)
} else {
s.metrics.ReportReconcileCycleTime(float64(cycleTime.Milliseconds()))
s.metrics.ReportReconcileCycleTime(cycleTime)
log.Infof("reconciliation cycle completed in %s", cycleTime)
}

Expand Down
33 changes: 16 additions & 17 deletions internal/scheduler/scheduler_metrics.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package scheduler

import (
"time"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
)

Expand All @@ -23,24 +26,17 @@ type SchedulerMetrics struct {
preemptedJobsPerQueue prometheus.GaugeVec
}

var schedulerMetrics *SchedulerMetrics

func init() {
schedulerMetrics = newSchedulerMetrics()
}

func GetSchedulerMetrics() *SchedulerMetrics {
return schedulerMetrics
}

func newSchedulerMetrics() *SchedulerMetrics {
func NewSchedulerMetrics(config configuration.SchedulerMetricsConfig) *SchedulerMetrics {
scheduleCycleTime := prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: NAMESPACE,
Subsystem: SUBSYSTEM,
Name: "schedule_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.LinearBuckets(0, 5, 20),
Buckets: prometheus.ExponentialBuckets(
config.ScheduleCycleTimeHistogramSettings.Start,
config.ScheduleCycleTimeHistogramSettings.Factor,
config.ScheduleCycleTimeHistogramSettings.Count),
},
)

Expand All @@ -50,7 +46,10 @@ func newSchedulerMetrics() *SchedulerMetrics {
Subsystem: SUBSYSTEM,
Name: "reconcile_cycle_times",
Help: "Cycle time when outside of a scheduling round.",
Buckets: prometheus.LinearBuckets(0, 5, 20),
Buckets: prometheus.ExponentialBuckets(
config.ReconcileCycleTimeHistogramSettings.Start,
config.ReconcileCycleTimeHistogramSettings.Factor,
config.ReconcileCycleTimeHistogramSettings.Count),
},
)

Expand Down Expand Up @@ -93,12 +92,12 @@ func newSchedulerMetrics() *SchedulerMetrics {
}
}

func (metrics *SchedulerMetrics) ReportScheduleCycleTime(cycleTime float64) {
metrics.scheduleCycleTime.Observe(cycleTime)
func (metrics *SchedulerMetrics) ReportScheduleCycleTime(cycleTime time.Duration) {
metrics.scheduleCycleTime.Observe(float64(cycleTime.Milliseconds()))
}

func (metrics *SchedulerMetrics) ReportReconcileCycleTime(cycleTime float64) {
metrics.reconcileCycleTime.Observe(cycleTime)
func (metrics *SchedulerMetrics) ReportReconcileCycleTime(cycleTime time.Duration) {
metrics.reconcileCycleTime.Observe(float64(cycleTime.Milliseconds()))
}

func (metrics *SchedulerMetrics) ReportSchedulerResult(result *SchedulerResult) {
Expand Down
19 changes: 17 additions & 2 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ var (
Version: 2,
}
updatedSchedulingInfoBytes = protoutil.MustMarshall(updatedSchedulingInfo)
schedulerMetrics = NewSchedulerMetrics(configuration.SchedulerMetricsConfig{
ScheduleCycleTimeHistogramSettings: configuration.HistogramConfig{
Start: 1,
Factor: 1.1,
Count: 100,
},
ReconcileCycleTimeHistogramSettings: configuration.HistogramConfig{
Start: 1,
Factor: 1.1,
Count: 100,
},
})
)

var queuedJob = jobdb.NewJob(
Expand Down Expand Up @@ -502,6 +514,7 @@ func TestScheduler_TestCycle(t *testing.T) {
clusterTimeout,
maxNumberOfAttempts,
nodeIdLabel,
schedulerMetrics,
)
require.NoError(t, err)

Expand Down Expand Up @@ -665,7 +678,8 @@ func TestRun(t *testing.T) {
15*time.Second,
1*time.Hour,
maxNumberOfAttempts,
nodeIdLabel)
nodeIdLabel,
schedulerMetrics)
require.NoError(t, err)

sched.clock = testClock
Expand Down Expand Up @@ -874,7 +888,8 @@ func TestScheduler_TestSyncState(t *testing.T) {
5*time.Second,
1*time.Hour,
maxNumberOfAttempts,
nodeIdLabel)
nodeIdLabel,
schedulerMetrics)
require.NoError(t, err)

// insert initial jobs
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func Run(config schedulerconfig.Configuration) error {
config.ExecutorTimeout,
config.Scheduling.MaxRetries+1,
config.Scheduling.Preemption.NodeIdLabel,
NewSchedulerMetrics(config.Metrics.Metrics),
)
if err != nil {
return errors.WithMessage(err, "error creating scheduler")
Expand Down

0 comments on commit 0415a30

Please sign in to comment.