Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the config to disable the scheduling key optimisation #3960

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,5 @@ scheduling:
resolution: "1Gi"
executorTimeout: "10m"
maxUnacknowledgedJobsPerExecutor: 2500
alwaysAttemptScheduling: false
executorUpdateFrequency: "60s"

2 changes: 0 additions & 2 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ type SchedulingConfig struct {
// Maximum number of jobs that can be assigned to a executor but not yet acknowledged, before
// the scheduler is excluded from consideration by the scheduler.
MaxUnacknowledgedJobsPerExecutor uint
// If true, do not during scheduling skip jobs with requirements known to be impossible to meet.
AlwaysAttemptScheduling bool
// The frequency at which the scheduler updates the cluster state.
ExecutorUpdateFrequency time.Duration
// Defines the order in which pools will be scheduled. Higher priority pools will be scheduled first
Expand Down
14 changes: 6 additions & 8 deletions internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,17 @@ func NewGangScheduler(
constraints schedulerconstraints.SchedulingConstraints,
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
skipUnsuccessfulSchedulingKeyCheck bool,
) (*GangScheduler, error) {
return &GangScheduler{
constraints: constraints,
floatingResourceTypes: floatingResourceTypes,
schedulingContext: sctx,
nodeDb: nodeDb,
constraints: constraints,
floatingResourceTypes: floatingResourceTypes,
schedulingContext: sctx,
nodeDb: nodeDb,
skipUnsuccessfulSchedulingKeyCheck: skipUnsuccessfulSchedulingKeyCheck,
}, nil
}

func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.skipUnsuccessfulSchedulingKeyCheck = true
}

func (sch *GangScheduler) updateGangSchedulingContextOnSuccess(gctx *schedulercontext.GangSchedulingContext, gangAddedToSchedulingContext bool) error {
if !gangAddedToSchedulingContext {
// Nothing to do.
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func TestGangScheduler(t *testing.T) {
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil, map[string]bool{})
floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources)
require.NoError(t, err)
sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb)
sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, false)
require.NoError(t, err)

var actualScheduledIndices []int
Expand Down
16 changes: 4 additions & 12 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ type PreemptingQueueScheduler struct {
jobIdsByGangId map[string]map[string]bool
// Maps job ids of gang jobs to the id of that gang.
gangIdByJobId map[string]string
// If true, the unsuccessfulSchedulingKeys check of gangScheduler is omitted.
skipUnsuccessfulSchedulingKeyCheck bool
// If true, asserts that the nodeDb state is consistent with expected changes.
enableAssertions bool
}
Expand Down Expand Up @@ -89,10 +87,6 @@ func (sch *PreemptingQueueScheduler) EnableAssertions() {
sch.enableAssertions = true
}

func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.skipUnsuccessfulSchedulingKeyCheck = true
}

// Schedule
// - preempts jobs belonging to queues with total allocation above their fair share and
// - schedules new jobs belonging to queues with total allocation less than their fair share.
Expand Down Expand Up @@ -155,6 +149,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
armadacontext.WithLogField(ctx, "stage", "re-schedule after balancing eviction"),
inMemoryJobRepo,
sch.jobRepo,
false,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -201,14 +196,13 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
// Re-schedule evicted jobs/schedule new jobs.
// Only necessary if a non-zero number of jobs were evicted.
if len(evictorResult.EvictedJctxsByJobId) > 0 {
// Since no new jobs are considered in this round, the scheduling key check brings no benefit.
sch.SkipUnsuccessfulSchedulingKeyCheck()
ctx.WithField("stage", "scheduling-algo").Info("Performing second scheduling ")
schedulerResult, err = sch.schedule(
armadacontext.WithLogField(ctx, "stage", "schedule after oversubscribed eviction"),
inMemoryJobRepo,
// Only evicted jobs should be scheduled in this round.
nil,
true, // Since no new jobs are considered in this round, the scheduling key check brings no benefit.
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -524,7 +518,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
return nil
}

func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) {
func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository, skipUnsuccessfulSchedulingKeyCheck bool) (*schedulerresult.SchedulerResult, error) {
jobIteratorByQueue := make(map[string]JobIterator)
for _, qctx := range sch.schedulingContext.QueueSchedulingContexts {
evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue)
Expand All @@ -545,13 +539,11 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo
sch.floatingResourceTypes,
sch.nodeDb,
jobIteratorByQueue,
skipUnsuccessfulSchedulingKeyCheck,
)
if err != nil {
return nil, err
}
if sch.skipUnsuccessfulSchedulingKeyCheck {
sched.SkipUnsuccessfulSchedulingKeyCheck()
}
result, err := sched.Schedule(ctx)
if err != nil {
return nil, err
Expand Down
7 changes: 2 additions & 5 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func NewQueueScheduler(
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
jobIteratorByQueue map[string]JobIterator,
skipUnsuccessfulSchedulingKeyCheck bool,
) (*QueueScheduler, error) {
for queue := range jobIteratorByQueue {
if _, ok := sctx.QueueSchedulingContexts[queue]; !ok {
return nil, errors.Errorf("no scheduling context for queue %s", queue)
}
}
gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb)
gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, skipUnsuccessfulSchedulingKeyCheck)
if err != nil {
return nil, err
}
Expand All @@ -58,10 +59,6 @@ func NewQueueScheduler(
}, nil
}

func (sch *QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.gangScheduler.SkipUnsuccessfulSchedulingKeyCheck()
}

func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresult.SchedulerResult, error) {
var scheduledJobs []*schedulercontext.JobSchedulingContext

Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestQueueScheduler(t *testing.T) {
it := jobRepo.GetJobIterator(q.Name)
jobIteratorByQueue[q.Name] = it
}
sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue)
sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue, false)
require.NoError(t, err)

result, err := sch.Schedule(armadacontext.Background())
Expand Down
3 changes: 0 additions & 3 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,6 @@ func (l *FairSchedulingAlgo) schedulePool(
fsctx.jobIdsByGangId,
fsctx.gangIdByJobId,
)
if l.schedulingConfig.AlwaysAttemptScheduling {
scheduler.SkipUnsuccessfulSchedulingKeyCheck()
}
if l.schedulingConfig.EnableAssertions {
scheduler.EnableAssertions()
}
Expand Down