Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Eviction Probability Config Values #3651

Merged
merged 6 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ scheduling:
resolution: "1"
disableScheduling: false
enableAssertions: false
nodeEvictionProbability: 1.0
nodeOversubscriptionEvictionProbability: 1.0
protectedFractionOfFairShare: 1.0
nodeIdLabel: "kubernetes.io/hostname"
priorityClasses:
Expand Down
9 changes: 0 additions & 9 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,6 @@ type SchedulingConfig struct {
DisableScheduling bool
// Set to true to enable scheduler assertions. This results in some performance loss.
EnableAssertions bool
// If using PreemptToFairShare,
// the probability of evicting jobs on a node to balance resource usage.
// TODO(albin): Remove.
NodeEvictionProbability float64
// If using PreemptToFairShare,
// the probability of evicting jobs on oversubscribed nodes, i.e.,
// nodes on which the total resource requests are greater than the available resources.
// TODO(albin): Remove.
NodeOversubscriptionEvictionProbability float64
// Only queues allocated more than this fraction of their fair share are considered for preemption.
ProtectedFractionOfFairShare float64 `validate:"gte=0"`
// Armada adds a node selector term to every scheduled pod using this label with the node name as value.
Expand Down
58 changes: 15 additions & 43 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package scheduler

import (
"math/rand"
"reflect"
"time"

Expand All @@ -27,13 +26,11 @@ import (
// PreemptingQueueScheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule.
// Uses QueueScheduler as a building block.
type PreemptingQueueScheduler struct {
schedulingContext *schedulercontext.SchedulingContext
constraints schedulerconstraints.SchedulingConstraints
nodeEvictionProbability float64
nodeOversubscriptionEvictionProbability float64
protectedFractionOfFairShare float64
jobRepo JobRepository
nodeDb *nodedb.NodeDb
schedulingContext *schedulercontext.SchedulingContext
constraints schedulerconstraints.SchedulingConstraints
protectedFractionOfFairShare float64
jobRepo JobRepository
nodeDb *nodedb.NodeDb
// Maps job ids to the id of the node the job is associated with.
// For scheduled or running jobs, that is the node the job is assigned to.
// For preempted jobs, that is the node the job was preempted from.
Expand All @@ -51,8 +48,6 @@ type PreemptingQueueScheduler struct {
func NewPreemptingQueueScheduler(
sctx *schedulercontext.SchedulingContext,
constraints schedulerconstraints.SchedulingConstraints,
nodeEvictionProbability float64,
nodeOversubscriptionEvictionProbability float64,
protectedFractionOfFairShare float64,
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
Expand All @@ -74,16 +69,14 @@ func NewPreemptingQueueScheduler(
initialJobIdsByGangId[gangId] = maps.Clone(jobIds)
}
return &PreemptingQueueScheduler{
schedulingContext: sctx,
constraints: constraints,
nodeEvictionProbability: nodeEvictionProbability,
nodeOversubscriptionEvictionProbability: nodeOversubscriptionEvictionProbability,
protectedFractionOfFairShare: protectedFractionOfFairShare,
jobRepo: jobRepo,
nodeDb: nodeDb,
nodeIdByJobId: maps.Clone(initialNodeIdByJobId),
jobIdsByGangId: initialJobIdsByGangId,
gangIdByJobId: maps.Clone(initialGangIdByJobId),
schedulingContext: sctx,
constraints: constraints,
protectedFractionOfFairShare: protectedFractionOfFairShare,
jobRepo: jobRepo,
nodeDb: nodeDb,
nodeIdByJobId: maps.Clone(initialNodeIdByJobId),
jobIdsByGangId: initialJobIdsByGangId,
gangIdByJobId: maps.Clone(initialGangIdByJobId),
}
}

Expand Down Expand Up @@ -120,7 +113,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
sch.jobRepo,
sch.nodeDb,
sch.schedulingContext.PriorityClasses,
sch.nodeEvictionProbability,
func(ctx *armadacontext.Context, job *jobdb.Job) bool {
priorityClass := job.PriorityClass()
if !priorityClass.Preemptible {
Expand All @@ -144,7 +136,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
}
return true
},
nil,
),
)
if err != nil {
Expand Down Expand Up @@ -185,8 +176,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
sch.jobRepo,
sch.nodeDb,
sch.schedulingContext.PriorityClasses,
sch.nodeOversubscriptionEvictionProbability,
nil,
),
)
if err != nil {
Expand Down Expand Up @@ -709,22 +698,14 @@ func NewNodeEvictor(
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
priorityClasses map[string]types.PriorityClass,
perNodeEvictionProbability float64,
jobFilter func(*armadacontext.Context, *jobdb.Job) bool,
random *rand.Rand,
) *Evictor {
if perNodeEvictionProbability <= 0 {
return nil
}
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
return &Evictor{
jobRepo: jobRepo,
nodeDb: nodeDb,
priorityClasses: priorityClasses,
nodeFilter: func(_ *armadacontext.Context, node *internaltypes.Node) bool {
return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability
return len(node.AllocatedByJobId) > 0
},
jobFilter: jobFilter,
}
Expand Down Expand Up @@ -759,20 +740,11 @@ func NewFilteredEvictor(

// NewOversubscribedEvictor returns a new evictor that
// for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled
// with probability perNodeEvictionProbability.
func NewOversubscribedEvictor(
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
priorityClasses map[string]types.PriorityClass,
perNodeEvictionProbability float64,
random *rand.Rand,
) *Evictor {
if perNodeEvictionProbability <= 0 {
return nil
}
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
// Populating overSubscribedPriorities relies on
// - nodeFilter being called once before all calls to jobFilter and
// - jobFilter being called for all jobs on that node before moving on to another node.
Expand All @@ -792,7 +764,7 @@ func NewOversubscribedEvictor(
overSubscribedPriorities[p] = true
}
}
return len(overSubscribedPriorities) > 0 && random.Float64() < perNodeEvictionProbability
return len(overSubscribedPriorities) > 0
},
jobFilter: func(ctx *armadacontext.Context, job *jobdb.Job) bool {
priorityClass := job.PriorityClass()
Expand Down
67 changes: 6 additions & 61 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ func TestEvictOversubscribed(t *testing.T) {
evictor := NewOversubscribedEvictor(
NewSchedulerJobRepositoryAdapter(jobDbTxn),
nodeDb,
config.PriorityClasses,
1,
nil,
)
config.PriorityClasses)
result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn)
require.NoError(t, err)

Expand Down Expand Up @@ -572,53 +569,10 @@ func TestPreemptingQueueScheduler(t *testing.T) {
"C": 1,
},
},
"gang preemption with NodeEvictionProbability 0": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.0, // To test the gang evictor, we need to disable stochastic eviction.
testfixtures.TestSchedulingConfig(),
),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
// Schedule a gang filling all of node 1 and part of node 2.
// Make the jobs of node 1 priority 1,
// to avoid them being urgency-preempted in the next round.
JobsByQueue: map[string][]*jobdb.Job{
"A": testfixtures.WithGangAnnotationsJobs(
append(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass1, 32), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)...),
),
},
ExpectedScheduledIndices: map[string][]int{
"A": testfixtures.IntRange(0, 32),
},
},
{
// Schedule a that requires preempting one job in the gang,
// and assert that all jobs in the gang are preempted.
JobsByQueue: map[string][]*jobdb.Job{
"B": testfixtures.N32Cpu256GiJobs("B", testfixtures.PriorityClass1, 1),
},
ExpectedScheduledIndices: map[string][]int{
"B": testfixtures.IntRange(0, 0),
},
ExpectedPreemptedIndices: map[string]map[int][]int{
"A": {
0: testfixtures.IntRange(0, 32),
},
},
},
},
PriorityFactorByQueue: map[string]float64{
"A": 1,
"B": 1,
},
},

"gang preemption avoid cascading preemption": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.0, // To test the gang evictor, we need to disable stochastic eviction.
testfixtures.TestSchedulingConfig(),
),
Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
// Schedule a gang spanning nodes 1 and 2.
Expand Down Expand Up @@ -1139,11 +1093,8 @@ func TestPreemptingQueueScheduler(t *testing.T) {
},
},
"Oversubscribed eviction does not evict non-preemptible": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.0,
testfixtures.TestSchedulingConfig(),
),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
JobsByQueue: map[string][]*jobdb.Job{
Expand Down Expand Up @@ -1837,8 +1788,6 @@ func TestPreemptingQueueScheduler(t *testing.T) {
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.NodeEvictionProbability,
tc.SchedulingConfig.NodeOversubscriptionEvictionProbability,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
nodeDb,
Expand Down Expand Up @@ -2194,8 +2143,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.NodeEvictionProbability,
tc.SchedulingConfig.NodeOversubscriptionEvictionProbability,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
nodeDb,
Expand Down Expand Up @@ -2256,8 +2203,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.NodeEvictionProbability,
tc.SchedulingConfig.NodeOversubscriptionEvictionProbability,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
nodeDb,
Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,6 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors(
scheduler := NewPreemptingQueueScheduler(
sctx,
constraints,
l.schedulingConfig.NodeEvictionProbability,
l.schedulingConfig.NodeOversubscriptionEvictionProbability,
l.schedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(fsctx.txn),
nodeDb,
Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,6 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
sch := scheduler.NewPreemptingQueueScheduler(
sctx,
constraints,
s.schedulingConfig.NodeEvictionProbability,
s.schedulingConfig.NodeOversubscriptionEvictionProbability,
s.schedulingConfig.ProtectedFractionOfFairShare,
scheduler.NewSchedulerJobRepositoryAdapter(txn),
nodeDb,
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/simulator/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func GetOneQueue10JobWorkload() *WorkloadSpec {

func GetBasicSchedulingConfig() configuration.SchedulingConfig {
return configuration.SchedulingConfig{
NodeEvictionProbability: 1.0,
PriorityClasses: map[string]types.PriorityClass{
"armada-default": {
Priority: 30000,
Expand Down
12 changes: 0 additions & 12 deletions internal/scheduler/testfixtures/testfixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ func TestSchedulingConfig() schedulerconfiguration.SchedulingConfig {
return schedulerconfiguration.SchedulingConfig{
PriorityClasses: maps.Clone(TestPriorityClasses),
DefaultPriorityClassName: TestDefaultPriorityClass,
NodeEvictionProbability: 1.0,
NodeOversubscriptionEvictionProbability: 1.0,
MaximumSchedulingRate: math.Inf(1),
MaximumSchedulingBurst: math.MaxInt,
MaximumPerQueueSchedulingRate: math.Inf(1),
Expand All @@ -188,16 +186,6 @@ func WithProtectedFractionOfFairShareConfig(v float64, config schedulerconfigura
return config
}

func WithNodeEvictionProbabilityConfig(p float64, config schedulerconfiguration.SchedulingConfig) schedulerconfiguration.SchedulingConfig {
config.NodeEvictionProbability = p
return config
}

func WithNodeOversubscriptionEvictionProbabilityConfig(p float64, config schedulerconfiguration.SchedulingConfig) schedulerconfiguration.SchedulingConfig {
config.NodeOversubscriptionEvictionProbability = p
return config
}

func WithRoundLimitsConfig(limits map[string]float64, config schedulerconfiguration.SchedulingConfig) schedulerconfiguration.SchedulingConfig {
config.MaximumResourceFractionToSchedule = limits
return config
Expand Down