Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add queue priority to metrics #3766

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/common/metrics/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type QueueMetricProvider interface {
GetQueuedJobMetrics(queueName string) []*QueueMetrics
GetRunningJobMetrics(queueName string) []*QueueMetrics
GetQueuePriorites() map[string]float64
}

type QueueMetrics struct {
Expand Down
106 changes: 57 additions & 49 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,154 +11,147 @@ const MetricPrefix = "armada_"
var QueueSizeDesc = prometheus.NewDesc(
MetricPrefix+"queue_size",
"Number of jobs in a queue",
[]string{"queueName"},
[]string{"queueName", "queue"},
nil,
)

var QueueDistinctSchedulingKeysDesc = prometheus.NewDesc(
MetricPrefix+"queue_distinct_scheduling_keys",
"Number of distinct scheduling keys requested by a queue",
[]string{"queueName"},
nil,
)

var QueuePriorityDesc = prometheus.NewDesc(
MetricPrefix+"queue_priority",
"Priority of a queue",
[]string{"pool", "queueName"},
[]string{"queueName", "queue"},
nil,
)

var QueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued",
"Resource required by queued jobs",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MinQueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued_min",
"Min resource required by queued job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MaxQueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued_max",
"Max resource required by queued job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MedianQueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued_median",
"Median resource required by queued jobs",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var CountQueueResourcesDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_queued_count",
"Count of queued jobs requiring resource",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MinQueueDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_queued_seconds_min",
"Min queue time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MaxQueueDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_queued_seconds_max",
"Max queue time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MedianQueueDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_queued_seconds_median",
"Median queue time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var QueueDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_queued_seconds",
"Queued time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MinJobRunDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_run_time_seconds_min",
"Min run time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MaxJobRunDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_run_time_seconds_max",
"Max run time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var MedianJobRunDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_run_time_seconds_median",
"Median run time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var JobRunDurationDesc = prometheus.NewDesc(
MetricPrefix+"job_run_time_seconds",
"Run time for Armada jobs",
[]string{"pool", "priorityClass", "queueName"},
[]string{"pool", "priorityClass", "queueName", "queue"},
nil,
)

var QueueAllocatedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_allocated",
"Resource allocated to running jobs of a queue",
[]string{"cluster", "pool", "priorityClass", "queueName", "resourceType", "nodeType"},
[]string{"cluster", "pool", "priorityClass", "queueName", "queue", "resourceType", "nodeType"},
nil,
)

var MinQueueAllocatedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_allocated_min",
"Min resource allocated by a running job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MaxQueueAllocatedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_allocated_max",
"Max resource allocated by a running job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var MedianQueueAllocatedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_allocated_median",
"Median resource allocated by a running job",
[]string{"pool", "priorityClass", "queueName", "resourceType"},
[]string{"pool", "priorityClass", "queueName", "queue", "resourceType"},
nil,
)

var QueueUsedDesc = prometheus.NewDesc(
MetricPrefix+"queue_resource_used",
"Resource actually being used by running jobs of a queue",
[]string{"cluster", "pool", "queueName", "resourceType", "nodeType"},
[]string{"cluster", "pool", "queueName", "queue", "resourceType", "nodeType"},
nil,
)

var QueueLeasedPodCountDesc = prometheus.NewDesc(
MetricPrefix+"queue_leased_pod_count",
"Number of leased pods",
[]string{"cluster", "pool", "queueName", "phase", "nodeType"},
[]string{"cluster", "pool", "queueName", "queue", "phase", "nodeType"},
nil,
)

Expand All @@ -176,6 +169,13 @@ var ClusterAvailableCapacityDesc = prometheus.NewDesc(
nil,
)

var QueuePriorityDesc = prometheus.NewDesc(
MetricPrefix+"queue_priority",
"Queue priority factor",
[]string{"queueName", "queue"},
nil,
)

var AllDescs = []*prometheus.Desc{
QueueSizeDesc,
QueuePriorityDesc,
Expand All @@ -201,6 +201,7 @@ var AllDescs = []*prometheus.Desc{
QueueLeasedPodCountDesc,
ClusterCapacityDesc,
ClusterAvailableCapacityDesc,
QueuePriorityDesc,
}

func Describe(out chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -264,83 +265,86 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC
}
}
}
for q, priority := range metricsProvider.GetQueuePriorites() {
metrics = append(metrics, NewQueuePriorityMetric(priority, q))
}
return metrics
}

func NewQueueSizeMetric(value int, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueSizeDesc, prometheus.GaugeValue, float64(value), queue)
return prometheus.MustNewConstMetric(QueueSizeDesc, prometheus.GaugeValue, float64(value), queue, queue)
}

func NewQueueDistinctSchedulingKeyMetric(value int, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueDistinctSchedulingKeysDesc, prometheus.GaugeValue, float64(value), queue)
return prometheus.MustNewConstMetric(QueueDistinctSchedulingKeysDesc, prometheus.GaugeValue, float64(value), queue, queue)
}

func NewQueueDuration(count uint64, sum float64, buckets map[float64]uint64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstHistogram(QueueDurationDesc, count, sum, buckets, pool, priorityClass, queue)
return prometheus.MustNewConstHistogram(QueueDurationDesc, count, sum, buckets, pool, priorityClass, queue, queue)
}

func NewQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(QueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMaxQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MaxQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMinQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MinQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMedianQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MedianQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewCountQueueResources(value uint64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(CountQueueResourcesDesc, prometheus.GaugeValue, float64(value), pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(CountQueueResourcesDesc, prometheus.GaugeValue, float64(value), pool, priorityClass, queue, queue, resource)
}

func NewMinQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MinQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMaxQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MaxQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMedianQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MedianQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMinJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MinJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMaxJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MaxJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewMedianJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue)
return prometheus.MustNewConstMetric(MedianJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue)
}

func NewJobRunRunDuration(count uint64, sum float64, buckets map[float64]uint64, pool string, priorityClass string, queue string) prometheus.Metric {
return prometheus.MustNewConstHistogram(JobRunDurationDesc, count, sum, buckets, pool, priorityClass, queue)
return prometheus.MustNewConstHistogram(JobRunDurationDesc, count, sum, buckets, pool, priorityClass, queue, queue)
}

func NewMinQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MinQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MinQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMaxQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MaxQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MaxQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewMedianQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric {
return prometheus.MustNewConstMetric(MedianQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource)
return prometheus.MustNewConstMetric(MedianQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource)
}

func NewQueueLeasedPodCount(value float64, cluster string, pool string, queue string, phase string, nodeType string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueLeasedPodCountDesc, prometheus.GaugeValue, value, cluster, pool, queue, phase, nodeType)
return prometheus.MustNewConstMetric(QueueLeasedPodCountDesc, prometheus.GaugeValue, value, cluster, pool, queue, queue, phase, nodeType)
}

func NewClusterAvailableCapacity(value float64, cluster string, pool string, resource string, nodeType string) prometheus.Metric {
Expand All @@ -352,9 +356,13 @@ func NewClusterTotalCapacity(value float64, cluster string, pool string, resourc
}

func NewQueueAllocated(value float64, queue string, cluster string, pool string, priorityClass string, resource string, nodeType string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, resource, nodeType)
return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, queue, resource, nodeType)
}

func NewQueueUsed(value float64, queue string, cluster string, pool string, resource string, nodeType string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueUsedDesc, prometheus.GaugeValue, value, cluster, pool, queue, resource, nodeType)
return prometheus.MustNewConstMetric(QueueUsedDesc, prometheus.GaugeValue, value, cluster, pool, queue, queue, resource, nodeType)
}

func NewQueuePriorityMetric(value float64, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueuePriorityDesc, prometheus.GaugeValue, value, queue, queue)
}
8 changes: 8 additions & 0 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@ import (
type queueState struct {
queuedJobRecorder *commonmetrics.JobMetricsRecorder
runningJobRecorder *commonmetrics.JobMetricsRecorder
priority float64
}

// metricProvider is a simple implementation of QueueMetricProvider
type metricProvider struct {
queueStates map[string]*queueState
}

func (m metricProvider) GetQueuePriorites() map[string]float64 {
return armadamaps.MapValues(m.queueStates, func(v *queueState) float64 {
return v.priority
})
}

func (m metricProvider) GetQueuedJobMetrics(queueName string) []*commonmetrics.QueueMetrics {
state, ok := m.queueStates[queueName]
if ok {
Expand Down Expand Up @@ -143,6 +150,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro
provider.queueStates[queue.Name] = &queueState{
queuedJobRecorder: commonmetrics.NewJobMetricsRecorder(),
runningJobRecorder: commonmetrics.NewJobMetricsRecorder(),
priority: queue.PriorityFactor,
}
queuedJobsCount[queue.Name] = 0
schedulingKeysByQueue[queue.Name] = map[schedulerobjects.SchedulingKey]bool{}
Expand Down