From 72f71dc6e791bc29f4a6cdacf2e9886213e37a39 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sat, 21 Sep 2024 16:51:35 +0100 Subject: [PATCH] Fix preempted metrics (#3951) * Fix preempted metrics Signed-off-by: Chris Martin * fix comment Signed-off-by: Chris Martin * fix test Signed-off-by: Chris Martin * lint Signed-off-by: Chris Martin --------- Signed-off-by: Chris Martin --- internal/scheduler/jobdb/job.go | 5 +++-- internal/scheduler/jobdb/job_run_test.go | 7 +++++++ internal/scheduler/jobdb/job_test.go | 4 +++- internal/scheduler/metrics/state_metrics.go | 10 +++++++++- internal/scheduler/scheduler.go | 3 +++ internal/scheduler/scheduling_algo.go | 3 ++- 6 files changed, 27 insertions(+), 5 deletions(-) diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index 063876a6b8b..d1682d775f0 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -651,10 +651,11 @@ func (job *Job) ValidateResourceRequests() error { // WithNewRun creates a copy of the job with a new run on the given executor. func (job *Job) WithNewRun(executor, nodeId, nodeName, pool string, scheduledAtPriority int32) *Job { + now := job.jobDb.clock.Now() return job.WithUpdatedRun(job.jobDb.CreateRun( job.jobDb.uuidProvider.New(), job.Id(), - job.jobDb.clock.Now().UnixNano(), + now.UnixNano(), executor, nodeId, nodeName, @@ -668,7 +669,7 @@ func (job *Job) WithNewRun(executor, nodeId, nodeName, pool string, scheduledAtP false, false, false, - nil, + &now, nil, nil, nil, diff --git a/internal/scheduler/jobdb/job_run_test.go b/internal/scheduler/jobdb/job_run_test.go index d03ab970275..e527d8dd697 100644 --- a/internal/scheduler/jobdb/job_run_test.go +++ b/internal/scheduler/jobdb/job_run_test.go @@ -2,9 +2,11 @@ package jobdb import ( "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" + clock "k8s.io/utils/clock/testing" "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" @@ -30,6 +32,7 @@ var ( } TestDefaultPriorityClass = PriorityClass3 SchedulingKeyGenerator = schedulerobjects.NewSchedulingKeyGeneratorWithKey(make([]byte, 32)) + testClock = clock.NewFakeClock(time.Now()) jobDb = NewJobDbWithSchedulingKeyGenerator( TestPriorityClasses, TestDefaultPriorityClass, @@ -41,6 +44,10 @@ var ( scheduledAtPriority = int32(5) ) +func init() { + jobDb.clock = testClock +} + var baseJobRun = jobDb.CreateRun( uuid.New().String(), uuid.NewString(), diff --git a/internal/scheduler/jobdb/job_test.go b/internal/scheduler/jobdb/job_test.go index b66fdceadaa..546dccfa8ea 100644 --- a/internal/scheduler/jobdb/job_test.go +++ b/internal/scheduler/jobdb/job_test.go @@ -134,18 +134,20 @@ func TestJob_TestWithNewRun(t *testing.T) { jobWithRun := baseJob.WithNewRun("test-executor", "test-nodeId", "nodeId", "pool", scheduledAtPriority) assert.Equal(t, true, jobWithRun.HasRuns()) run := jobWithRun.LatestRun() + created := jobDb.clock.Now() assert.NotNil(t, run) assert.Equal( t, &JobRun{ id: run.id, jobId: "test-job", - created: run.created, + created: created.UnixNano(), executor: "test-executor", nodeId: "test-nodeId", nodeName: "nodeId", pool: "pool", scheduledAtPriority: &scheduledAtPriority, + leaseTime: &created, }, run, ) diff --git a/internal/scheduler/metrics/state_metrics.go b/internal/scheduler/metrics/state_metrics.go index 8d4b9ae0817..3e25deb892b 100644 --- a/internal/scheduler/metrics/state_metrics.go +++ b/internal/scheduler/metrics/state_metrics.go @@ -144,7 +144,7 @@ func (m *jobStateMetrics) collect(ch chan<- prometheus.Metric) { } } -// ReportJobLeased reports the job as being leasedJob. This has to be reported separately because the state transition +// ReportJobLeased reports the job as being leased. This has to be reported separately because the state transition // logic does work for job leased! func (m *jobStateMetrics) ReportJobLeased(job *jobdb.Job) { run := job.LatestRun() @@ -152,6 +152,14 @@ func (m *jobStateMetrics) ReportJobLeased(job *jobdb.Job) { m.updateStateDuration(job, leased, priorState, duration) } +// ReportJobPreempted reports the job as being preempted. This has to be reported separately because the state transition +// logic does work for job preempted! +func (m *jobStateMetrics) ReportJobPreempted(job *jobdb.Job) { + run := job.LatestRun() + duration, priorState := stateDuration(job, run, run.PreemptedTime()) + m.updateStateDuration(job, preempted, priorState, duration) +} + func (m *jobStateMetrics) ReportStateTransitions( jsts []jobdb.JobStateTransitions, jobRunErrorsByRunId map[string]*armadaevents.Error, diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index a552594c29e..1cb8a163b89 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -348,6 +348,9 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke for _, jctx := range overallSchedulerResult.ScheduledJobs { s.metrics.ReportJobLeased(jctx.Job) } + for _, jctx := range overallSchedulerResult.PreemptedJobs { + s.metrics.ReportJobPreempted(jctx.Job) + } } return overallSchedulerResult, nil diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index e3b2f9b11af..176b3ec733f 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -497,8 +497,9 @@ func (l *FairSchedulingAlgo) schedulePool( } for i, jctx := range result.PreemptedJobs { jobDbJob := jctx.Job + now := l.clock.Now() if run := jobDbJob.LatestRun(); run != nil { - jobDbJob = jobDbJob.WithUpdatedRun(run.WithFailed(true)) + jobDbJob = jobDbJob.WithUpdatedRun(run.WithFailed(true).WithPreemptedTime(&now)) } else { return nil, nil, errors.Errorf("attempting to preempt job %s with no associated runs", jobDbJob.Id()) }