Skip to content

Commit

Permalink
add queue priority to metrics (#3766)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 and d80tb7 authored Jul 3, 2024
1 parent c62ef88 commit 5796a53
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 49 deletions.
1 change: 1 addition & 0 deletions internal/common/metrics/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type QueueMetricProvider interface {
GetQueuedJobMetrics(queueName string) []*QueueMetrics
GetRunningJobMetrics(queueName string) []*QueueMetrics
GetQueuePriorites() map[string]float64
}

type QueueMetrics struct {
Expand Down
106 changes: 57 additions & 49 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,154 +11,147 @@ const MetricPrefix = "armada_"
var QueueSizeDesc = prometheus.NewDesc(
MetricPrefix+"queue_size",
"Number of jobs in a queue",
[]string{"queueName"},
[]string{"queueName", "queue"},
nil,
)

var QueueDistinctSchedulingKeysDesc = prometheus.NewDesc(
MetricPrefix+"queue_distinct_scheduling_keys",
"Number of distinct scheduling keys requested by a queue",
[]string{"queueName"},
nil,
)

var QueuePriorityDesc = prometheus.NewDesc(
MetricPrefix+"queue_priority",
"Priority of a queue",
[]string{"pool", "queueName"},
[]string{"queueName", "queue"},
nil,
)

var QueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued",
"Resource required by queued jobs",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MinQueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued_min",
"Min resource required by queued job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MaxQueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued_max",
"Max resource required by queued job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MedianQueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued_median",
"Median resource required by queued jobs",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var CountQueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued_count",
"Count of queued jobs requiring resource",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MinQueueDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_queued_seconds_min",
"Min queue time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MaxQueueDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_queued_seconds_max",
"Max queue time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MedianQueueDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_queued_seconds_median",
"Median queue time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var QueueDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_queued_seconds",
"Queued time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MinJobRunDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_run_time_seconds_min",
"Min run time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MaxJobRunDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_run_time_seconds_max",
"Max run time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MedianJobRunDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_run_time_seconds_median",
"Median run time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var JobRunDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_run_time_seconds",
"Run time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var QueueAllocatedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_allocated",
"Resource allocated to running jobs of a queue",
[]string{"cluster", "pool", "priorityClass", "queueName", "resourceType", "nodeType"},
[]string{"cluster", "pool", "priorityClass", "queueName", "queue", "resourceType", "nodeType"},
nil,
)

var MinQueueAllocatedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_allocated_min",
"Min resource allocated by a running job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MaxQueueAllocatedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_allocated_max",
"Max resource allocated by a running job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MedianQueueAllocatedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_allocated_median",
"Median resource allocated by a running job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var QueueUsedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_used",
"Resource actually being used by running jobs of a queue",
[]string{"cluster", "pool", "queueName", "resourceType", "nodeType"},
[]string{"cluster", "pool", "queueName", "queue", "resourceType", "nodeType"},
nil,
)

var QueueLeasedPodCountDesc = prometheus.NewDesc(
MetricPrefix+"queue_leased_pod_count",
"Number of leased pods",
[]string{"cluster", "pool", "queueName", "phase", "nodeType"},
[]string{"cluster", "pool", "queueName", "queue", "phase", "nodeType"},
nil,
)

Expand All @@ -176,6 +169,13 @@ var ClusterAvailableCapacityDesc = prometheus.NewDesc(
nil,
)

var QueuePriorityDesc = prometheus.NewDesc(
MetricPrefix+"queue_priority",
"Queue priority factor",
[]string{"queueName", "queue"},
nil,
)

var AllDescs = []*prometheus.Desc{
QueueSizeDesc,
QueuePriorityDesc,
Expand All @@ -201,6 +201,7 @@ var AllDescs = []*prometheus.Desc{
QueueLeasedPodCountDesc,
ClusterCapacityDesc,
ClusterAvailableCapacityDesc,
QueuePriorityDesc,
}

func Describe(out chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -264,83 +265,86 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC
}
}
}
for q, priority := range metricsProvider.GetQueuePriorites() {
metrics = append(metrics, NewQueuePriorityMetric(priority, q))
}
return metrics
}

func NewQueueSizeMetric(value int, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueSizeDesc, prometheus.GaugeValue, float64(value), queue)
return prometheus.MustNewConstMetric(QueueSizeDesc, prometheus.GaugeValue, float64(value), queue, queue)
}

func NewQueueDistinctSchedulingKeyMetric(value int, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueDistinctSchedulingKeysDesc, prometheus.GaugeValue, float64(value), queue)
return prometheus.MustNewConstMetric(QueueDistinctSchedulingKeysDesc, prometheus.GaugeValue, float64(value), queue, queue)
}

func NewQueueDuration(count uint64, sum float64, buckets map[float64]uint64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstHistogram(QueueDurationDesc, count, sum, buckets, pool, priorityClass, queue)
return prometheus.MustNewConstHistogram(QueueDurationDesc, count, sum, buckets, pool, priorityClass, queue, queue)
}

func NewQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(QueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMaxQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MaxQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMinQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MinQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMedianQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MedianQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewCountQueueResources(value uint64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(CountQueueResourcesDesc, prometheus.GaugeValue, float64(value), pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(CountQueueResourcesDesc, prometheus.GaugeValue, float64(value), pool, priorityClass, queue, queue, resource)
}

func NewMinQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MinQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMaxQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MaxQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMedianQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MedianQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMinJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MinJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMaxJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MaxJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMedianJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MedianJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewJobRunRunDuration(count uint64, sum float64, buckets map[float64]uint64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstHistogram(JobRunDurationDesc, count, sum, buckets, pool, priorityClass, queue)
return prometheus.MustNewConstHistogram(JobRunDurationDesc, count, sum, buckets, pool, priorityClass, queue, queue)
}

func NewMinQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MinQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMaxQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MaxQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMedianQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MedianQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewQueueLeasedPodCount(value float64, cluster string, pool string, queue string, phase string, nodeType string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueLeasedPodCountDesc, prometheus.GaugeValue, value, cluster, pool, queue, phase, nodeType)
return prometheus.MustNewConstMetric(QueueLeasedPodCountDesc, prometheus.GaugeValue, value, cluster, pool, queue, queue, phase, nodeType)
}

func NewClusterAvailableCapacity(value float64, cluster string, pool string, resource string, nodeType string) prometheus.Metric {
Expand All @@ -352,9 +356,13 @@ func NewClusterTotalCapacity(value float64, cluster string, pool string, resourc
}

func NewQueueAllocated(value float64, queue string, cluster string, pool string, priorityClass string, resource string, nodeType string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, resource, nodeType)
return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, queue, resource, nodeType)
}

func NewQueueUsed(value float64, queue string, cluster string, pool string, resource string, nodeType string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueUsedDesc, prometheus.GaugeValue, value, cluster, pool, queue, resource, nodeType)
return prometheus.MustNewConstMetric(QueueUsedDesc, prometheus.GaugeValue, value, cluster, pool, queue, queue, resource, nodeType)
}

func NewQueuePriorityMetric(value float64, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueuePriorityDesc, prometheus.GaugeValue, value, queue, queue)
}
8 changes: 8 additions & 0 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@ import (
type queueState struct {
queuedJobRecorder *commonmetrics.JobMetricsRecorder
runningJobRecorder *commonmetrics.JobMetricsRecorder
priority float64
}

// metricProvider is a simple implementation of QueueMetricProvider
type metricProvider struct {
queueStates map[string]*queueState
}

func (m metricProvider) GetQueuePriorites() map[string]float64 {
return armadamaps.MapValues(m.queueStates, func(v *queueState) float64 {
return v.priority
})
}

func (m metricProvider) GetQueuedJobMetrics(queueName string) []*commonmetrics.QueueMetrics {
state, ok := m.queueStates[queueName]
if ok {
Expand Down Expand Up @@ -143,6 +150,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro
provider.queueStates[queue.Name] = &queueState{
queuedJobRecorder: commonmetrics.NewJobMetricsRecorder(),
runningJobRecorder: commonmetrics.NewJobMetricsRecorder(),
priority: queue.PriorityFactor,
}
queuedJobsCount[queue.Name] = 0
schedulingKeysByQueue[queue.Name] = map[schedulerobjects.SchedulingKey]bool{}
Expand Down

0 comments on commit 5796a53

Please sign in to comment.