From 5e18b42bd4820f4c4707d291c439c3451f8c032d Mon Sep 17 00:00:00 2001 From: Noah Held <41909795+zuqq@users.noreply.github.com> Date: Wed, 7 Jun 2023 10:51:32 +0100 Subject: [PATCH] Add preemptions to `SchedulingContextRepository` (#2544) --- internal/scheduler/context/context.go | 16 +++++- internal/scheduler/reports.go | 83 +++++++++++++++++++++++++-- internal/scheduler/reports_test.go | 62 ++++++++++++++++---- 3 files changed, 143 insertions(+), 18 deletions(-) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 22f1e8a4ed3..415699b69bd 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -279,7 +279,6 @@ type QueueSchedulingContext struct { const maxPrintedJobIdsByReason = 1 -// TODO: Update with preemptions. func (qctx *QueueSchedulingContext) String() string { var sb strings.Builder w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) @@ -289,8 +288,11 @@ func (qctx *QueueSchedulingContext) String() string { fmt.Fprintf(w, "Total allocated resources after scheduling (by priority):\t%s\n", qctx.AllocatedByPriority.String()) fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriority.AggregateByResource().CompactString()) fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriority.String()) + fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriority.AggregateByResource().CompactString()) + fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriority.String()) fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts)) fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts)) + fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById)) if len(qctx.SuccessfulJobSchedulingContexts) > 0 { jobIdsToPrint := maps.Keys(qctx.SuccessfulJobSchedulingContexts) if len(jobIdsToPrint) > maxPrintedJobIdsByReason { @@ -326,6 +328,18 @@ func (qctx *QueueSchedulingContext) String() string { } } } + if len(qctx.EvictedJobsById) > 0 { + jobIdsToPrint := maps.Keys(qctx.EvictedJobsById) + if len(jobIdsToPrint) > maxPrintedJobIdsByReason { + jobIdsToPrint = jobIdsToPrint[0:maxPrintedJobIdsByReason] + } + fmt.Fprintf(w, "Preempted jobs:\t%v", jobIdsToPrint) + if len(jobIdsToPrint) != len(qctx.EvictedJobsById) { + fmt.Fprintf(w, " (and %d others not shown)\n", len(qctx.EvictedJobsById)-len(jobIdsToPrint)) + } else { + fmt.Fprint(w, "\n") + } + } w.Flush() return sb.String() } diff --git a/internal/scheduler/reports.go b/internal/scheduler/reports.go index 9bef8dbdac3..5c98ad89ba9 100644 --- a/internal/scheduler/reports.go +++ b/internal/scheduler/reports.go @@ -31,19 +31,27 @@ type SchedulingContextRepository struct { mostRecentSchedulingContextByExecutorP atomic.Pointer[SchedulingContextByExecutor] // The most recent attempt where a non-zero amount of resources were scheduled. mostRecentSuccessfulSchedulingContextByExecutorP atomic.Pointer[SchedulingContextByExecutor] + // The most recent attempt that preempted at least one job. + mostRecentPreemptingSchedulingContextByExecutorP atomic.Pointer[SchedulingContextByExecutor] + // Maps queue name to QueueSchedulingContextByExecutor. // The most recent attempt. mostRecentQueueSchedulingContextByExecutorByQueueP atomic.Pointer[map[string]QueueSchedulingContextByExecutor] // The most recent attempt where a non-zero amount of resources were scheduled. mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP atomic.Pointer[map[string]QueueSchedulingContextByExecutor] + // Map from queue name to most recent attempt that preempted at least one job belonging to this queue. + mostRecentPreemptingQueueSchedulingContextByExecutorByQueueP atomic.Pointer[map[string]QueueSchedulingContextByExecutor] + // Maps job id to JobSchedulingContextByExecutor. // We limit the number of job contexts to store to control memory usage. mostRecentJobSchedulingContextByExecutorByJobId *lru.Cache + // Store all executor ids seen so far in a set. // Used to ensure all executors are included in reports. executorIds map[string]bool // All executors in sorted order. sortedExecutorIdsP atomic.Pointer[[]string] + // Protects the fields in this struct from concurrent and dirty writes. mu sync.Mutex } @@ -63,16 +71,27 @@ func NewSchedulingContextRepository(maxJobSchedulingContextsPerExecutor uint) (* mostRecentJobSchedulingContextByExecutorByJobId: jobSchedulingContextByExecutorByJobId, executorIds: make(map[string]bool), } + mostRecentSchedulingContextByExecutor := make(SchedulingContextByExecutor) mostRecentSuccessfulSchedulingContextByExecutor := make(SchedulingContextByExecutor) + mostRecentPreemptingSchedulingContextByExecutorP := make(SchedulingContextByExecutor) + mostRecentQueueSchedulingContextByExecutorByQueue := make(map[string]QueueSchedulingContextByExecutor) mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue := make(map[string]QueueSchedulingContextByExecutor) + mostRecentPreemptingQueueSchedulingContextByExecutorByQueue := make(map[string]QueueSchedulingContextByExecutor) + sortedExecutorIds := make([]string, 0) + rv.mostRecentSchedulingContextByExecutorP.Store(&mostRecentSchedulingContextByExecutor) rv.mostRecentSuccessfulSchedulingContextByExecutorP.Store(&mostRecentSuccessfulSchedulingContextByExecutor) + rv.mostRecentPreemptingSchedulingContextByExecutorP.Store(&mostRecentPreemptingSchedulingContextByExecutorP) + rv.mostRecentQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentQueueSchedulingContextByExecutorByQueue) rv.mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue) + rv.mostRecentPreemptingQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentPreemptingQueueSchedulingContextByExecutorByQueue) + rv.sortedExecutorIdsP.Store(&sortedExecutorIds) + return rv, nil } @@ -121,23 +140,35 @@ func (repo *SchedulingContextRepository) addExecutorId(executorId string) error func (repo *SchedulingContextRepository) addSchedulingContext(sctx *schedulercontext.SchedulingContext) error { mostRecentSchedulingContextByExecutor := *repo.mostRecentSchedulingContextByExecutorP.Load() mostRecentSchedulingContextByExecutor = maps.Clone(mostRecentSchedulingContextByExecutor) + mostRecentSchedulingContextByExecutor[sctx.ExecutorId] = sctx + mostRecentSuccessfulSchedulingContextByExecutor := *repo.mostRecentSuccessfulSchedulingContextByExecutorP.Load() mostRecentSuccessfulSchedulingContextByExecutor = maps.Clone(mostRecentSuccessfulSchedulingContextByExecutor) - mostRecentSchedulingContextByExecutor[sctx.ExecutorId] = sctx if !sctx.ScheduledResourcesByPriority.IsZero() { mostRecentSuccessfulSchedulingContextByExecutor[sctx.ExecutorId] = sctx } + + mostRecentPreemptingContextByExecutor := *repo.mostRecentPreemptingSchedulingContextByExecutorP.Load() + mostRecentPreemptingContextByExecutor = maps.Clone(mostRecentPreemptingContextByExecutor) + if !sctx.EvictedResourcesByPriority.IsZero() { + mostRecentPreemptingContextByExecutor[sctx.ExecutorId] = sctx + } + repo.mostRecentSchedulingContextByExecutorP.Store(&mostRecentSchedulingContextByExecutor) repo.mostRecentSuccessfulSchedulingContextByExecutorP.Store(&mostRecentSuccessfulSchedulingContextByExecutor) + repo.mostRecentPreemptingSchedulingContextByExecutorP.Store(&mostRecentPreemptingContextByExecutor) + return nil } // Should only be called from AddSchedulingContext to avoid dirty writes. func (repo *SchedulingContextRepository) addQueueSchedulingContexts(qctxs []*schedulercontext.QueueSchedulingContext) error { - mostRecentQueueSchedulingContextByExecutorByQueue := *repo.mostRecentQueueSchedulingContextByExecutorByQueueP.Load() - mostRecentQueueSchedulingContextByExecutorByQueue = maps.Clone(mostRecentQueueSchedulingContextByExecutorByQueue) - mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue := *repo.mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP.Load() - mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue = maps.Clone(mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue) + mostRecentQueueSchedulingContextByExecutorByQueue := maps.Clone(*repo.mostRecentQueueSchedulingContextByExecutorByQueueP.Load()) + + mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue := maps.Clone(*repo.mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP.Load()) + + mostRecentPreemptingQueueSchedulingContextByExecutorByQueue := maps.Clone(*repo.mostRecentPreemptingQueueSchedulingContextByExecutorByQueueP.Load()) + for _, qctx := range qctxs { if qctx.ExecutorId == "" { return errors.WithStack(&armadaerrors.ErrInvalidArgument{ @@ -153,6 +184,7 @@ func (repo *SchedulingContextRepository) addQueueSchedulingContexts(qctxs []*sch Message: "received empty queue name", }) } + if previous := mostRecentQueueSchedulingContextByExecutorByQueue[qctx.Queue]; previous != nil { previous = maps.Clone(previous) previous[qctx.ExecutorId] = qctx @@ -162,6 +194,19 @@ func (repo *SchedulingContextRepository) addQueueSchedulingContexts(qctxs []*sch qctx.ExecutorId: qctx, } } + + if !qctx.EvictedResourcesByPriority.IsZero() { + if previous := mostRecentPreemptingQueueSchedulingContextByExecutorByQueue[qctx.Queue]; previous != nil { + previous = maps.Clone(previous) + previous[qctx.ExecutorId] = qctx + mostRecentPreemptingQueueSchedulingContextByExecutorByQueue[qctx.Queue] = previous + } else { + mostRecentPreemptingQueueSchedulingContextByExecutorByQueue[qctx.Queue] = QueueSchedulingContextByExecutor{ + qctx.ExecutorId: qctx, + } + } + } + if !qctx.ScheduledResourcesByPriority.IsZero() { if previous := mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue[qctx.Queue]; previous != nil { previous = maps.Clone(previous) @@ -174,8 +219,10 @@ func (repo *SchedulingContextRepository) addQueueSchedulingContexts(qctxs []*sch } } } + repo.mostRecentQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentQueueSchedulingContextByExecutorByQueue) repo.mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue) + return nil } @@ -236,6 +283,7 @@ func (repo *SchedulingContextRepository) getSchedulingReportString() string { sortedExecutorIds := repo.GetSortedExecutorIds() mostRecentSchedulingContextByExecutor := repo.GetMostRecentSchedulingContextByExecutor() mostRecentSuccessfulSchedulingContextByExecutor := repo.GetMostRecentSuccessfulSchedulingContextByExecutor() + mostRecentPreemptingSchedulingContextByExecutor := repo.GetMostRecentPreemptingSchedulingContextByExecutor() var sb strings.Builder w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) for _, executorId := range sortedExecutorIds { @@ -254,6 +302,13 @@ func (repo *SchedulingContextRepository) getSchedulingReportString() string { } else { fmt.Fprint(w, indent.String("\t", "Most recent successful attempt: none\n")) } + sctx = mostRecentPreemptingSchedulingContextByExecutor[executorId] + if sctx != nil { + fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt:\n")) + fmt.Fprint(w, indent.String("\t\t", sctx.String())) + } else { + fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt: none\n")) + } } w.Flush() return sb.String() @@ -274,6 +329,7 @@ func (repo *SchedulingContextRepository) getQueueReportString(queue string) stri sortedExecutorIds := repo.GetSortedExecutorIds() mostRecentQueueSchedulingContextByExecutor, _ := repo.GetMostRecentQueueSchedulingContextByExecutor(queue) mostRecentSuccessfulQueueSchedulingContextByExecutor, _ := repo.GetMostRecentSuccessfulQueueSchedulingContextByExecutor(queue) + mostRecentPreemptingQueueSchedulingContextByExecutor, _ := repo.GetMostRecentPreemptingQueueSchedulingContextByExecutor(queue) for _, executorId := range sortedExecutorIds { fmt.Fprintf(w, "%s:\n", executorId) qctx := mostRecentQueueSchedulingContextByExecutor[executorId] @@ -290,6 +346,13 @@ func (repo *SchedulingContextRepository) getQueueReportString(queue string) stri } else { fmt.Fprint(w, indent.String("\t", "Most recent successful attempt: none\n")) } + qctx = mostRecentPreemptingQueueSchedulingContextByExecutor[executorId] + if qctx != nil { + fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt:\n")) + fmt.Fprint(w, indent.String("\t\t", qctx.String())) + } else { + fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt: none\n")) + } } w.Flush() return sb.String() @@ -337,6 +400,10 @@ func (repo *SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContex return *repo.mostRecentSuccessfulSchedulingContextByExecutorP.Load() } +func (repo *SchedulingContextRepository) GetMostRecentPreemptingSchedulingContextByExecutor() SchedulingContextByExecutor { + return *repo.mostRecentPreemptingSchedulingContextByExecutorP.Load() +} + func (repo *SchedulingContextRepository) GetMostRecentQueueSchedulingContextByExecutor(queue string) (QueueSchedulingContextByExecutor, bool) { mostRecentQueueSchedulingContextByExecutorByQueue := *repo.mostRecentQueueSchedulingContextByExecutorByQueueP.Load() mostRecentQueueSchedulingContextByExecutor, ok := mostRecentQueueSchedulingContextByExecutorByQueue[queue] @@ -349,6 +416,12 @@ func (repo *SchedulingContextRepository) GetMostRecentSuccessfulQueueSchedulingC return mostRecentSuccessfulQueueSchedulingContextByExecutor, ok } +func (repo *SchedulingContextRepository) GetMostRecentPreemptingQueueSchedulingContextByExecutor(queue string) (QueueSchedulingContextByExecutor, bool) { + mostRecentPreemptingQueueSchedulingContextByExecutorByQueue := *repo.mostRecentPreemptingQueueSchedulingContextByExecutorByQueueP.Load() + mostRecentPreemptingQueueSchedulingContextByExecutor, ok := mostRecentPreemptingQueueSchedulingContextByExecutorByQueue[queue] + return mostRecentPreemptingQueueSchedulingContextByExecutor, ok +} + func (repo *SchedulingContextRepository) GetMostRecentJobSchedulingContextByExecutor(jobId string) (JobSchedulingContextByExecutor, bool) { if v, ok := repo.mostRecentJobSchedulingContextByExecutorByJobId.Get(jobId); ok { jobSchedulingContextByExecutor := v.(JobSchedulingContextByExecutor) diff --git a/internal/scheduler/reports_test.go b/internal/scheduler/reports_test.go index 1107185f79f..db73443c362 100644 --- a/internal/scheduler/reports_test.go +++ b/internal/scheduler/reports_test.go @@ -69,6 +69,11 @@ func TestAddGetSchedulingContext(t *testing.T) { err = repo.AddSchedulingContext(sctx) require.NoError(t, err) + sctx = testSchedulingContext("baz") + sctx = withPreemptingJobSchedulingContext(sctx, "C", "preempted") + err = repo.AddSchedulingContext(sctx) + require.NoError(t, err) + actualJobSchedulingContextByExecutor, ok := repo.GetMostRecentJobSchedulingContextByExecutor("doesNotExist") require.Nil(t, actualJobSchedulingContextByExecutor) require.False(t, ok) @@ -139,12 +144,23 @@ func TestAddGetSchedulingContext(t *testing.T) { actualQueueSchedulingContextByExecutor, ) + actualQueueSchedulingContextByExecutor, ok = repo.GetMostRecentQueueSchedulingContextByExecutor("C") + require.True(t, ok) + assert.Equal( + t, + QueueSchedulingContextByExecutor{ + "baz": withPreemptingJobSchedulingContext(testSchedulingContext("baz"), "C", "preempted").QueueSchedulingContexts["C"], + }, + actualQueueSchedulingContextByExecutor, + ) + actualSchedulingContextByExecutor := repo.GetMostRecentSchedulingContextByExecutor() assert.Equal( t, SchedulingContextByExecutor{ "foo": withUnsuccessfulJobSchedulingContext(testSchedulingContext("foo"), "A", "failureA"), "bar": withUnsuccessfulJobSchedulingContext(testSchedulingContext("bar"), "B", "failureB"), + "baz": withPreemptingJobSchedulingContext(testSchedulingContext("baz"), "C", "preempted"), }, actualSchedulingContextByExecutor, ) @@ -158,6 +174,15 @@ func TestAddGetSchedulingContext(t *testing.T) { }, actualSchedulingContextByExecutor, ) + + actualSchedulingContextByExecutor = repo.GetMostRecentPreemptingSchedulingContextByExecutor() + assert.Equal( + t, + SchedulingContextByExecutor{ + "baz": withPreemptingJobSchedulingContext(testSchedulingContext("baz"), "C", "preempted"), + }, + actualSchedulingContextByExecutor, + ) } // Concurrently write/read to/from the repo to test that there are no panics. @@ -177,7 +202,9 @@ func TestTestAddGetSchedulingContextConcurrency(t *testing.T) { sctx := testSchedulingContext(executorId) sctx = withUnsuccessfulJobSchedulingContext(sctx, "A", "failureA") sctx = withUnsuccessfulJobSchedulingContext(sctx, "B", "failureB") + sctx = withUnsuccessfulJobSchedulingContext(sctx, "C", "failureC") sctx = withSuccessfulJobSchedulingContext(sctx, "B", fmt.Sprintf("success%sB", executorId)) + sctx = withPreemptingJobSchedulingContext(sctx, "C", "preempted") err = repo.AddSchedulingContext(sctx) require.NoError(t, err) err = repo.AddSchedulingContext(sctx) @@ -217,18 +244,29 @@ func withSuccessfulJobSchedulingContext(sctx *schedulercontext.SchedulingContext ExecutorId: sctx.ExecutorId, JobId: jobId, } - qctx.ScheduledResourcesByPriority.AddResourceList( - 0, - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}, - }, - ) - sctx.ScheduledResourcesByPriority.AddResourceList( - 0, - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}, - }, - ) + rl := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}} + qctx.ScheduledResourcesByPriority.AddResourceList(0, rl) + sctx.ScheduledResourcesByPriority.AddResourceList(0, rl) + return sctx +} + +func withPreemptingJobSchedulingContext(sctx *schedulercontext.SchedulingContext, queue, jobId string) *schedulercontext.SchedulingContext { + if sctx.QueueSchedulingContexts == nil { + sctx.QueueSchedulingContexts = make(map[string]*schedulercontext.QueueSchedulingContext) + } + qctx := sctx.QueueSchedulingContexts[queue] + if qctx == nil { + if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByPriorityAndResourceType)); err != nil { + panic(err) + } + qctx = sctx.QueueSchedulingContexts[queue] + qctx.SchedulingContext = nil + qctx.Created = time.Time{} + } + qctx.EvictedJobsById[jobId] = true + rl := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}} + qctx.EvictedResourcesByPriority.AddResourceList(0, rl) + sctx.EvictedResourcesByPriority.AddResourceList(0, rl) return sctx }