Skip to content

Commit

Permalink
Fixing broken tests, merging artifacts, linting
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafai-gr committed Jul 25, 2024
1 parent a05e1d0 commit f3dabf3
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 256 deletions.
1 change: 1 addition & 0 deletions cmd/armadactl/cmd/scheduling_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
Expand Down
3 changes: 2 additions & 1 deletion internal/common/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package slices

import (
"fmt"
goslices "golang.org/x/exp/slices"
"math"
"math/rand"
"sync"

goslices "golang.org/x/exp/slices"

"github.com/armadaproject/armada/internal/common/interfaces"
)

Expand Down
50 changes: 15 additions & 35 deletions internal/scheduler/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ func TestConstraints(t *testing.T) {
UnschedulableReasonMaximumResourcesExceeded,
"",
),
"scheduling-paused-on-queue-constraint": func() *constraintTest {
t := makeConstraintsTest(NewSchedulingConstraints(
"pool-1",
makeResourceList("1000", "1000Gi"),
configuration.SchedulingConfig{},
[]*api.Queue{{Name: "queue-1", SchedulingPaused: true}},
map[string]bool{"queue-1": false},
))
t.expectedCheckConstraintsReason = "scheduling paused on queue"
return t
}(),
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand All @@ -171,24 +182,6 @@ func TestCapResources(t *testing.T) {
resources schedulerobjects.QuantityByTAndResourceType[string]
expectedResources schedulerobjects.QuantityByTAndResourceType[string]
}{
"no contraints": {
constraints: NewSchedulingConstraints(
"pool-1",
makeResourceList("1000", "1000Gi"),
makeSchedulingConfig(),
[]*api.Queue{},
map[string]bool{},
))
t.expectedCheckConstraintsReason = "job requests 1 cpu, but the minimum is 5"
return t
}(),
"above-maximum-resources-to-schedule": func() *constraintTest {
t := makeConstraintsTest(NewSchedulingConstraints(
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
expectedResources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
},
"unconstrained": {
constraints: NewSchedulingConstraints(
"pool-1",
Expand All @@ -203,6 +196,7 @@ func TestCapResources(t *testing.T) {
},
},
[]*api.Queue{},
map[string]bool{},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1", "1Gi")},
Expand All @@ -222,6 +216,7 @@ func TestCapResources(t *testing.T) {
},
},
[]*api.Queue{},
map[string]bool{},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
Expand Down Expand Up @@ -250,6 +245,7 @@ func TestCapResources(t *testing.T) {
},
},
},
map[string]bool{},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{"priority-class-1": makeResourceList("1000", "1000Gi")},
Expand Down Expand Up @@ -281,6 +277,7 @@ func TestCapResources(t *testing.T) {
},
},
},
map[string]bool{},
),
queue: "queue-1",
resources: map[string]schedulerobjects.ResourceList{
Expand All @@ -292,23 +289,6 @@ func TestCapResources(t *testing.T) {
"priority-class-2": makeResourceList("900", "900Gi"),
},
},
map[string]bool{},
))
t.expectedCheckRoundConstraintsReason = "maximum resources scheduled"
return t
}(),
"scheduling-paused-on-queue-constraint": func() *constraintTest {
t := makeConstraintsTest(NewSchedulingConstraints(
"pool-1",
makeResourceList("1000", "1000Gi"),
makeResourceList("0", "0"),
configuration.SchedulingConfig{},
[]*api.Queue{{Name: "queue-1", SchedulingPaused: true}},
map[string]bool{"queue-1": false},
))
t.expectedCheckConstraintsReason = "scheduling paused on queue"
return t
}(),
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand Down
14 changes: 7 additions & 7 deletions internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,6 @@ func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *schedulerco
}

func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) {
// Exit immediately if this is a new gang and we've hit any round limits.
if !gctx.AllJobsEvicted {
if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext); err != nil || !ok {
return
}
}

// This deferred function ensures unschedulable jobs are registered as such.
gangAddedToSchedulingContext := false
defer func() {
Expand All @@ -136,6 +129,13 @@ func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulerco
}
}()

// Exit immediately if this is a new gang and we've hit any round limits.
if !gctx.AllJobsEvicted {
if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext); err != nil || !ok {
return
}
}

if _, err = sch.schedulingContext.AddGangSchedulingContext(gctx); err != nil {
return
}
Expand Down
17 changes: 10 additions & 7 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul
// instruct the underlying iterator to only yield evicted jobs from now on.
sch.schedulingContext.TerminationReason = unschedulableReason
sch.candidateGangIterator.OnlyYieldEvicted()
MarkGctxUnschedulable(sch.schedulingContext, gctx, unschedulableReason)
} else if schedulerconstraints.IsTerminalQueueUnschedulableReason(unschedulableReason) {
// If unschedulableReason indicates no more new jobs can be scheduled for this queue,
// instruct the underlying iterator to only yield evicted jobs for this queue from now on.
Expand Down Expand Up @@ -423,26 +422,30 @@ func (it *CandidateGangIterator) newPQItem(queue string, queueIt *QueuedGangIter
}
}

func MarkGctxUnschedulable(sctx *schedulercontext.SchedulingContext, gctx *schedulercontext.GangSchedulingContext, reason string) {
func MarkGctxUnschedulable(sctx *schedulercontext.SchedulingContext, gctx *schedulercontext.GangSchedulingContext, reason string) error {
if gctx != nil {
for _, jctx := range gctx.JobSchedulingContexts {
jctx.UnschedulableReason = reason
sctx.AddJobSchedulingContext(jctx)
jctx.Fail(reason)
}
if _, err := sctx.AddGangSchedulingContext(gctx); err != nil {
return err
}
}
return nil
}

func (it *CandidateGangIterator) markPQItemAsUnschedulableWithReason(item *QueueCandidateGangIteratorItem, reason string) error {
if item == nil {
return nil
}
var err error
gctx := item.gctx
gctx, err := item.it.Next()
for {
if err != nil {
return err
} else if gctx != nil {
MarkGctxUnschedulable(item.it.schedulingContext, gctx, reason)
if err = MarkGctxUnschedulable(item.it.schedulingContext, gctx, reason); err != nil {
return err
}
} else {
break
}
Expand Down
19 changes: 9 additions & 10 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,11 @@ func (it *JobQueueIteratorAdapter) Next() (*jobdb.Job, error) {
}

type fairSchedulingAlgoContext struct {
queues []*api.Queue
priorityFactorByQueue map[string]float64
demandByPoolByQueue map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]
queues []*api.Queue
priorityFactorByQueue map[string]float64
demandByPoolByQueue map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]
// Determines whether a queue has scheduling enabled. Not the same as a queue being active.
schedulingStatusByQueue map[string]bool
// A queue is active if it has jobs in the states running or queued
isActiveByPoolByQueue map[string]map[string]bool
schedulingStatusByQueue map[string]bool
totalCapacityByPool schedulerobjects.QuantityByTAndResourceType[string]
nodesByPoolAndExecutor map[string]map[string][]*schedulerobjects.Node
jobsByPoolAndExecutor map[string]map[string][]*jobdb.Job
Expand Down Expand Up @@ -491,20 +489,21 @@ func (l *FairSchedulingAlgo) schedulePool(
l.limiterByQueue[queue] = queueLimiter
}

queueContextDemand := demand.AggregateByResource()

if !fsctx.schedulingStatusByQueue[queue] {
queueLimiter.SetLimitAt(now, rate.Limit(float64(0)))
queueLimiter.SetBurstAt(now, 0)
}


// Queued jobs should not be considered for paused queues, so demand := running
allocated := schedulerobjects.NewResourceListWithDefaultSize()
for _, rl := range allocatedByPriorityClass.DeepCopy() {
allocated.Add(rl)
}
demand = allocated
queueContextDemand = allocated
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByPriorityClass, demand.AggregateByResource(), cappedDemand.AggregateByResource(), queueLimiter); err != nil {

if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByPriorityClass, queueContextDemand, cappedDemand.AggregateByResource(), queueLimiter); err != nil {
return nil, nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling_algo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestSchedule(t *testing.T) {
testfixtures.MakeTestExecutor("executor-2", "pool-1"),
},
queues: []*api.Queue{testfixtures.MakeTestQueue()},
queuedJobs: testfixtures.N16Cpu128GiJobs(testfixtures.TestQueue, testfixtures.PriorityClass3, 10),
queuedJobs: testfixtures.N16Cpu128GiJobs(testfixtures.TestQueue0, testfixtures.PriorityClass3, 10),
expectedScheduledIndices: []int{0, 1, 2, 3, 4, 5},
expectedScheduledByPool: map[string]int{"pool-1": 4, "pool-2": 2},
},
Expand Down
Loading

0 comments on commit f3dabf3

Please sign in to comment.