Skip to content

Commit

Permalink
Executor settings set_by_user and set_at_time fields, exposing in sch… (
Browse files Browse the repository at this point in the history
#263) (#4020)

Co-authored-by: Mustafa Ilyas <Mustafa.Ilyas@gresearch.co.uk>
  • Loading branch information
MustafaI and mustafai-gr authored Oct 22, 2024
1 parent b1d9168 commit 1d02df1
Show file tree
Hide file tree
Showing 15 changed files with 354 additions and 163 deletions.
6 changes: 3 additions & 3 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ var ClusterAvailableCapacityDesc = prometheus.NewDesc(
var ClusterCordonedStatusDesc = prometheus.NewDesc(
MetricPrefix+"cluster_cordoned_status",
"Cluster cordoned status",
[]string{"cluster", "reason"},
[]string{"cluster", "reason", "setByUser"},
nil,
)

Expand Down Expand Up @@ -382,8 +382,8 @@ func NewClusterTotalCapacity(value float64, cluster string, pool string, resourc
return prometheus.MustNewConstMetric(ClusterCapacityDesc, prometheus.GaugeValue, value, cluster, pool, resource, nodeType)
}

func NewClusterCordonedStatus(value float64, cluster string, reason string) prometheus.Metric {
return prometheus.MustNewConstMetric(ClusterCordonedStatusDesc, prometheus.GaugeValue, value, cluster, reason)
func NewClusterCordonedStatus(value float64, cluster string, reason string, setByUser string) prometheus.Metric {
return prometheus.MustNewConstMetric(ClusterCordonedStatusDesc, prometheus.GaugeValue, value, cluster, reason, setByUser)
}

func NewQueueAllocated(value float64, queue string, cluster string, pool string, priorityClass string, resource string, nodeType string) prometheus.Metric {
Expand Down
3 changes: 3 additions & 0 deletions internal/scheduler/database/executor_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

Expand Down Expand Up @@ -109,6 +110,8 @@ func (r *PostgresExecutorRepository) GetExecutorSettings(ctx *armadacontext.Cont
ExecutorId: result.ExecutorId,
Cordoned: result.Cordoned,
CordonReason: result.CordonReason,
SetByUser: result.SetByUser,
SetAtTime: protoutil.ToTimestamp(result.SetAtTime),
}
executorSettings[i] = settings
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE executor_settings ADD COLUMN set_by_user text;
ALTER TABLE executor_settings ADD COLUMN set_at_time timestamptz;
2 changes: 2 additions & 0 deletions internal/scheduler/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions internal/scheduler/database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 11 additions & 7 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,9 @@ type clusterMetricKey struct {
}

type clusterCordonedStatus struct {
status float64
reason string
status float64
reason string
setByUser string
}

func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]prometheus.Metric, error) {
Expand Down Expand Up @@ -290,8 +291,9 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
for _, executor := range executors {
// We may not have executorSettings for all known executors, but we still want a cordon status metric for them.
cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{
status: 0.0,
reason: "",
status: 0.0,
reason: "",
setByUser: "",
}
for _, node := range executor.Nodes {
nodePool := node.GetPool()
Expand Down Expand Up @@ -387,11 +389,13 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
if cordonedValue, ok := cordonedStatusByCluster[executorSetting.ExecutorId]; ok {
cordonedValue.status = 1.0
cordonedValue.reason = executorSetting.CordonReason
cordonedValue.setByUser = executorSetting.SetByUser
} else {
// We may have settings for executors that don't exist in the repository.
cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{
status: 1.0,
reason: executorSetting.CordonReason,
status: 1.0,
reason: executorSetting.CordonReason,
setByUser: executorSetting.SetByUser,
}
}
}
Expand Down Expand Up @@ -439,7 +443,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterTotalCapacity(float64(v), k.cluster, k.pool, "nodes", k.nodeType))
}
for cluster, v := range cordonedStatusByCluster {
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterCordonedStatus(v.status, cluster, v.reason))
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterCordonedStatus(v.status, cluster, v.reason, v.setByUser))
}
return clusterMetrics, nil
}
Expand Down
12 changes: 6 additions & 6 deletions internal/scheduler/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
Expand All @@ -240,7 +240,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-2"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-2"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-2"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
Expand All @@ -254,7 +254,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
Expand All @@ -274,7 +274,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
Expand All @@ -290,7 +290,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
Expand All @@ -314,7 +314,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(1.0, "cluster-1", "bad executor"),
commonmetrics.NewClusterCordonedStatus(1.0, "cluster-1", "bad executor", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{
{
Expand Down
Loading

0 comments on commit 1d02df1

Please sign in to comment.