Skip to content

Commit

Permalink
update prometheus rules for server, scheduler and executor
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanzele committed Sep 27, 2024
1 parent 2ecbcc7 commit 969f60a
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 37 deletions.
27 changes: 13 additions & 14 deletions internal/controller/install/armadaserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func generateArmadaServerInstallComponents(as *installv1alpha1.ArmadaServer, sch
return nil, err
}

pdb := createPodDisruptionBudget(as)
pdb := createServerPodDisruptionBudget(as)
if err := controllerutil.SetOwnerReference(as, pdb, scheme); err != nil {
return nil, err
}
Expand All @@ -225,7 +225,7 @@ func generateArmadaServerInstallComponents(as *installv1alpha1.ArmadaServer, sch
return nil, err
}

sm = createServiceMonitor(as)
sm = createServerServiceMonitor(as)
if err := controllerutil.SetOwnerReference(as, sm, scheme); err != nil {
return nil, err
}
Expand Down Expand Up @@ -642,15 +642,15 @@ func createIngressHttp(as *installv1alpha1.ArmadaServer) (*networkingv1.Ingress,
return restIngress, nil
}

func createPodDisruptionBudget(as *installv1alpha1.ArmadaServer) *policyv1.PodDisruptionBudget {
func createServerPodDisruptionBudget(as *installv1alpha1.ArmadaServer) *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{Name: as.Name, Namespace: as.Namespace},
Spec: policyv1.PodDisruptionBudgetSpec{},
Status: policyv1.PodDisruptionBudgetStatus{},
}
}

func createServiceMonitor(as *installv1alpha1.ArmadaServer) *monitoringv1.ServiceMonitor {
func createServerServiceMonitor(as *installv1alpha1.ArmadaServer) *monitoringv1.ServiceMonitor {
var prometheusLabels map[string]string
if as.Spec.Prometheus != nil {
prometheusLabels = as.Spec.Prometheus.Labels
Expand Down Expand Up @@ -694,25 +694,24 @@ func (r *ArmadaServerReconciler) SetupWithManager(mgr ctrl.Manager) error {

// createServerPrometheusRule will provide a prometheus monitoring rule for the name and scrapeInterval
func createServerPrometheusRule(name, namespace string, scrapeInterval *metav1.Duration, labels ...map[string]string) *monitoringv1.PrometheusRule {
if scrapeInterval == nil {
scrapeInterval = &metav1.Duration{Duration: defaultPrometheusInterval}
}
queueSize := `avg(sum(armada_queue_size) by (queueName, pod)) by (queueName) > 0`
queueSize := `max(sum(armada_queue_size) by (queueName, pod)) by (queueName) > 0`
queuePriority := `avg(sum(armada_queue_priority) by (pool, queueName, pod)) by (pool, queueName)`
queueIdeal := `(sum(armada:queue:resource:queued{resourceType="cpu"} > bool 0) by (queueName, pool) * (1 / armada:queue:priority))
/ ignoring(queueName) group_left
sum(sum(armada:queue:resource:queued{resourceType="cpu"} > bool 0) by (queueName, pool) * (1 / armada:queue:priority)) by (pool)
* 100`
queueResourceQueued := `avg(armada_queue_resource_queued) by (pool, queueName, resourceType)`
queueResourceAllocated := `avg(armada_queue_resource_allocated) by (pool, cluster, queueName, resourceType, nodeType)`
queueResourceUsed := `avg(armada_queue_resource_used) by (pool, cluster, queueName, resourceType, nodeType)`
/ ignoring(queueName) group_left sum(sum(armada:queue:resource:queued{resourceType="cpu"} > bool 0) by (queueName, pool) * (1 / armada:queue:priority)) by (pool) * 100`

queueResourceQueued := `max(sum(armada_queue_resource_queued) by (pod, pool, queueName, resourceType)) by (pool, queueName, resourceType)`
queueResourceAllocated := `max(sum(armada_queue_resource_allocated) by (pod, pool, cluster, queueName, resourceType, nodeType)) by (pool, cluster, queueName, resourceType, nodeType)`
queueResourceUsed := `max(sum(armada_queue_resource_used) by (pod, pool, cluster, queueName, resourceType, nodeType)) by (pool, cluster, queueName, resourceType, nodeType)`
serverHist := `histogram_quantile(0.95, sum(rate(grpc_server_handling_seconds_bucket{grpc_type!="server_stream"}[2m])) by (grpc_method,grpc_service, le))`
serverRequestRate := `sum(rate(grpc_server_handled_total[2m])) by (grpc_method,grpc_service)`
logRate := `sum(rate(log_messages[2m])) by (level)`
availableCapacity := `avg(armada_cluster_available_capacity) by (pool, cluster, resourceType, nodeType)`
resourceCapacity := `avg(armada_cluster_capacity) by (pool, cluster, resourceType, nodeType)`
queuePodPhaseCount := `max(armada_queue_leased_pod_count) by (pool, cluster, queueName, phase, nodeType)`

if scrapeInterval == nil {
scrapeInterval = &metav1.Duration{Duration: defaultPrometheusInterval}
}
durationString := duration.ShortHumanDuration(scrapeInterval.Duration)
objectMetaName := "armada-" + name + "-metrics"
return &monitoringv1.PrometheusRule{
Expand Down
12 changes: 8 additions & 4 deletions internal/controller/install/executor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,17 @@ func createExecutorPrometheusRule(name, namespace string, scrapeInterval *metav1
if scrapeInterval == nil {
scrapeInterval = &metav1.Duration{Duration: defaultPrometheusInterval}
}

// Update the restRequestHistogram expression to align with Helm
restRequestHistogram := `histogram_quantile(0.95, ` +
`sum(rate(rest_client_request_duration_seconds_bucket{service="` + name + `"}[2m])) by (endpoint, verb, url, le))`
logRate := "sum(rate(log_messages[2m])) by (level)"

// Set the group name and duration string to match the Helm template
durationString := duration.ShortHumanDuration(scrapeInterval.Duration)
objectMetaName := "armada-" + name + "-metrics"
objectMetaName := "armada-executor-metrics"

return &monitoringv1.PrometheusRule{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Expand All @@ -477,11 +481,11 @@ func createExecutorPrometheusRule(name, namespace string, scrapeInterval *metav1
Interval: ptr.To(monitoringv1.Duration(durationString)),
Rules: []monitoringv1.Rule{
{
Record: "armada:" + name + ":rest:request:histogram95",
Record: "armada:executor:rest:request:histogram95",
Expr: intstr.IntOrString{StrVal: restRequestHistogram},
},
{
Record: "armada:" + name + ":log:rate",
Record: "armada:executor:log:rate",
Expr: intstr.IntOrString{StrVal: logRate},
},
},
Expand Down
215 changes: 196 additions & 19 deletions internal/controller/install/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/pkg/errors"

"k8s.io/utils/ptr"
Expand Down Expand Up @@ -128,6 +131,10 @@ func (r *SchedulerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

if err := upsertObjectIfNeeded(ctx, r.Client, components.PrometheusRule, scheduler.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

logger.Info("Successfully reconciled Scheduler object", "durationMillis", time.Since(started).Milliseconds())

return ctrl.Result{}, nil
Expand Down Expand Up @@ -170,11 +177,16 @@ func generateSchedulerInstallComponents(scheduler *installv1alpha1.Scheduler, sc
}

var serviceMonitor *monitoringv1.ServiceMonitor
var prometheusRule *monitoringv1.PrometheusRule
if scheduler.Spec.Prometheus != nil && scheduler.Spec.Prometheus.Enabled {
serviceMonitor = createSchedulerServiceMonitor(scheduler)
if err := controllerutil.SetOwnerReference(scheduler, serviceMonitor, scheme); err != nil {
return nil, err
}
prometheusRule = createSchedulerPrometheusRule(scheduler)
if err := controllerutil.SetOwnerReference(scheduler, prometheusRule, scheme); err != nil {
return nil, err
}
}

job, err := createSchedulerMigrationJob(scheduler, serviceAccountName)
Expand Down Expand Up @@ -214,29 +226,11 @@ func generateSchedulerInstallComponents(scheduler *installv1alpha1.Scheduler, sc
IngressGrpc: ingressGrpc,
Jobs: []*batchv1.Job{job},
ServiceMonitor: serviceMonitor,
PrometheusRule: prometheusRule,
CronJob: cronJob,
}, nil
}

// createSchedulerServiceMonitor will return a ServiceMonitor for this
func createSchedulerServiceMonitor(scheduler *installv1alpha1.Scheduler) *monitoringv1.ServiceMonitor {
return &monitoringv1.ServiceMonitor{
TypeMeta: metav1.TypeMeta{
Kind: "ServiceMonitor",
},
ObjectMeta: metav1.ObjectMeta{
Name: scheduler.Name,
Namespace: scheduler.Namespace,
Labels: AllLabels(scheduler.Name, scheduler.Spec.Labels, scheduler.Spec.Prometheus.Labels),
},
Spec: monitoringv1.ServiceMonitorSpec{
Endpoints: []monitoringv1.Endpoint{
{Port: "metrics", Interval: "15s"},
},
},
}
}

// Function to build the deployment object for Scheduler.
// This should be changing from CRD to CRD. Not sure if generailize this helps much
func createSchedulerDeployment(scheduler *installv1alpha1.Scheduler, serviceAccountName string) (*appsv1.Deployment, error) {
Expand Down Expand Up @@ -628,6 +622,189 @@ func createSchedulerCronJob(scheduler *installv1alpha1.Scheduler) (*batchv1.Cron
return &job, nil
}

// createSchedulerServiceMonitor will return a ServiceMonitor for this
func createSchedulerServiceMonitor(scheduler *installv1alpha1.Scheduler) *monitoringv1.ServiceMonitor {
scrapeInterval := &metav1.Duration{Duration: defaultPrometheusInterval}
if scheduler.Spec.Prometheus.ScrapeInterval == nil {
scrapeInterval = &metav1.Duration{Duration: defaultPrometheusInterval}
}
durationString := duration.ShortHumanDuration(scrapeInterval.Duration)
return &monitoringv1.ServiceMonitor{
TypeMeta: metav1.TypeMeta{
Kind: "ServiceMonitor",
},
ObjectMeta: metav1.ObjectMeta{
Name: scheduler.Name,
Namespace: scheduler.Namespace,
Labels: AllLabels(scheduler.Name, scheduler.Spec.Labels, scheduler.Spec.Prometheus.Labels),
},
Spec: monitoringv1.ServiceMonitorSpec{
Endpoints: []monitoringv1.Endpoint{
{Port: "metrics", Interval: monitoringv1.Duration(durationString)},
},
},
}
}

// createSchedulerPrometheusRule creates a PrometheusRule for monitoring Armada scheduler.
func createSchedulerPrometheusRule(scheduler *installv1alpha1.Scheduler) *monitoringv1.PrometheusRule {
rules := []monitoringv1.Rule{
{
Record: "node:armada_scheduler_failed_jobs",
Expr: intstr.IntOrString{StrVal: `sum by (node) (armada_scheduler_job_state_counter_by_node{state="failed"})`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_failed_jobs",
Expr: intstr.IntOrString{StrVal: `sum by (cluster, category, subCategory) (armada_scheduler_error_classification_by_node)`},
},
{
Record: "queue_category_subCategory:armada_scheduler_failed_jobs",
Expr: intstr.IntOrString{StrVal: `sum by (queue, category, subCategory) (armada_scheduler_job_error_classification_by_queue)`},
},
{
Record: "node:armada_scheduler_succeeded_jobs",
Expr: intstr.IntOrString{StrVal: `sum by (node) (armada_scheduler_job_state_counter_by_node{state="succeeded"})`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_succeeded_jobs",
Expr: intstr.IntOrString{StrVal: `sum by (cluster, category, subCategory) (armada_scheduler_job_state_counter_by_node{state="succeeded"})`},
},
{
Record: "queue_category_subCategory:armada_scheduler_succeeded_jobs",
Expr: intstr.IntOrString{StrVal: `sum by (queue) (armada_scheduler_job_state_counter_by_queue{state="succeeded"})`},
},
{
Record: "node:armada_scheduler_failed_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `increase(node:armada_scheduler_job_state_counter_by_queue{state="failed"}[1m:])`},
},
{
Record: "node:armada_scheduler_failed_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `increase(node:armada_scheduler_job_state_counter_by_queue{state="failed"}[10m:])`},
},
{
Record: "node:armada_scheduler_failed_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `increase(node:armada_scheduler_job_state_counter_by_queue{state="failed"}[1h:])`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_failed_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `increase(cluster_category_subCategory:armada_scheduler_failed_jobs[1m:])`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_failed_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `increase(cluster_category_subCategory:armada_scheduler_failed_jobs[10m:])`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_failed_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `increase(cluster_category_subCategory:armada_scheduler_failed_jobs[1h:])`},
},
{
Record: "queue_category_subCategory:armada_scheduler_failed_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `increase(queue_category_subCategory:armada_scheduler_failed_jobs[1m:])`},
},
{
Record: "queue_category_subCategory:armada_scheduler_failed_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `increase(queue_category_subCategory:armada_scheduler_failed_jobs[10m:])`},
},
{
Record: "queue_category_subCategory:armada_scheduler_failed_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `increase(queue_category_subCategory:armada_scheduler_failed_jobs[1h:])`},
},
{
Record: "node:armada_scheduler_succeeded_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `increase(node:armada_scheduler_succeeded_jobs[1m:])`},
},
{
Record: "node:armada_scheduler_succeeded_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `increase(node:armada_scheduler_succeeded_jobs[10m:])`},
},
{
Record: "node:armada_scheduler_succeeded_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `increase(node:armada_scheduler_succeeded_jobs[1h:])`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_succeeded_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `increase(cluster_category_subCategory:armada_scheduler_succeeded_jobs[1m:])`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_succeeded_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `increase(cluster_category_subCategory:armada_scheduler_succeeded_jobs[10m:])`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_succeeded_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `increase(cluster_category_subCategory:armada_scheduler_succeeded_jobs[1h:])`},
},
{
Record: "queue_category_subCategory:armada_scheduler_succeeded_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `increase(queue_category_subCategory:armada_scheduler_succeeded_jobs[1m:])`},
},
{
Record: "queue_category_subCategory:armada_scheduler_succeeded_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `increase(queue_category_subCategory:armada_scheduler_succeeded_jobs[10m:])`},
},
{
Record: "queue_category_subCategory:armada_scheduler_succeeded_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `increase(queue_category_subCategory:armada_scheduler_succeeded_jobs[1h:])`},
},
{
Record: "node:armada_scheduler_failed_rate_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `sum by(node) (node:armada_scheduler_failed_jobs:increase1m) / on(node) group_left() ((sum by(node) (node:armada_scheduler_failed_jobs:increase1m)) + (sum by(node) (node:armada_scheduler_succeeded_jobs:increase1m)))`},
},
{
Record: "node:armada_scheduler_failed_rate_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `sum by(node) (node:armada_scheduler_failed_jobs:increase10m) / on(node) group_left() ((sum by(node) (node:armada_scheduler_failed_jobs:increase10m)) + (sum by(node) (node:armada_scheduler_succeeded_jobs:increase10m)))`},
},
{
Record: "node:armada_scheduler_failed_rate_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `sum by(node) (node:armada_scheduler_failed_jobs:increase1h) / on(node) group_left() ((sum by(node) (node:armada_scheduler_failed_jobs:increase1h)) + (sum by(node) (node:armada_scheduler_succeeded_jobs:increase1h)))`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_failed_rate_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `sum by(cluster, category, subCategory) (cluster_category_subCategory:armada_scheduler_failed_jobs:increase1m) / on(cluster) group_left() ((sum by(cluster) (cluster_category_subCategory:armada_scheduler_failed_jobs:increase1m)) + (sum by(cluster) (cluster_category_subCategory:armada_scheduler_succeeded_jobs:increase1m)))`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_failed_rate_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `sum by(cluster, category, subCategory) (cluster_category_subCategory:armada_scheduler_failed_jobs:increase10m) / on(cluster) group_left() ((sum by(cluster) (cluster_category_subCategory:armada_scheduler_failed_jobs:increase10m)) + (sum by(cluster) (cluster_category_subCategory:armada_scheduler_succeeded_jobs:increase10m)))`},
},
{
Record: "cluster_category_subCategory:armada_scheduler_failed_rate_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `sum by(cluster, category, subCategory) (cluster_category_subCategory:armada_scheduler_failed_jobs:increase1h) / on(cluster) group_left() ((sum by(cluster) (cluster_category_subCategory:armada_scheduler_failed_jobs:increase1h)) + (sum by(cluster) (cluster_category_subCategory:armada_scheduler_succeeded_jobs:increase1h)))`},
},
{
Record: "queue_category_subCategory:armada_scheduler_failed_rate_jobs:increase1m",
Expr: intstr.IntOrString{StrVal: `sum by(queue, category, subCategory) (queue_category_subCategory:armada_scheduler_failed_jobs:increase1m) / on(queue) group_left() ((sum by(queue) (queue_category_subCategory:armada_scheduler_failed_jobs:increase1m)) + (sum by(queue) (queue_category_subCategory:armada_scheduler_succeeded_jobs:increase1m)))`},
},
{
Record: "queue_category_subCategory:armada_scheduler_failed_rate_jobs:increase10m",
Expr: intstr.IntOrString{StrVal: `sum by(queue, category, subCategory) (queue_category_subCategory:armada_scheduler_failed_jobs:increase10m) / on(queue) group_left() ((sum by(queue) (queue_category_subCategory:armada_scheduler_failed_jobs:increase10m)) + (sum by(queue) (queue_category_subCategory:armada_scheduler_succeeded_jobs:increase10m)))`},
},
{
Record: "queue_category_subCategory:armada_scheduler_failed_rate_jobs:increase1h",
Expr: intstr.IntOrString{StrVal: `sum by(queue, category, subCategory) (queue_category_subCategory:armada_scheduler_failed_jobs:increase1h) / on(queue) group_left() ((sum by(queue) (queue_category_subCategory:armada_scheduler_failed_jobs:increase1h)) + (sum by(queue) (queue_category_subCategory:armada_scheduler_succeeded_jobs:increase1h)))`},
},
}

objectMetaName := "armada-" + scheduler.Name + "-metrics"
scrapeInterval := &metav1.Duration{Duration: defaultPrometheusInterval}
if scheduler.Spec.Prometheus.ScrapeInterval == nil {
scrapeInterval = &metav1.Duration{Duration: defaultPrometheusInterval}
}
durationString := duration.ShortHumanDuration(scrapeInterval.Duration)
return &monitoringv1.PrometheusRule{
ObjectMeta: metav1.ObjectMeta{
Name: objectMetaName,
Namespace: scheduler.Namespace,
Labels: AllLabels(scheduler.Name, scheduler.Spec.Labels, scheduler.Spec.Prometheus.Labels),
},
Spec: monitoringv1.PrometheusRuleSpec{
Groups: []monitoringv1.RuleGroup{{
Name: objectMetaName,
Interval: ptr.To(monitoringv1.Duration(durationString)),
Rules: rules,
}},
},
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *SchedulerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down

0 comments on commit 969f60a

Please sign in to comment.