Skip to content

Commit

Permalink
schedule by pool
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 committed Aug 17, 2024
1 parent 7f00534 commit a53adbb
Showing 1 changed file with 137 additions and 148 deletions.
285 changes: 137 additions & 148 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ type Simulator struct {
jobDb *jobdb.JobDb
// Map from node id to the pool to which the node belongs.
poolByNodeId map[string]string
// Separate nodeDb per pool and executorGroup.
nodeDbByPoolAndExecutorGroup map[string][]*nodedb.NodeDb
// Map from executor name to the nodeDb to which it belongs.
nodeDbByExecutorName map[string]*nodedb.NodeDb
// Separate nodeDb per pool
nodeDbByPool map[string]*nodedb.NodeDb
// Allocation by pool for each queue and priority class.
// Stored across invocations of the scheduler.
allocationByPoolAndQueueAndPriorityClass map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]
Expand Down Expand Up @@ -139,9 +137,8 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
jobTemplatesByDependencyIds: make(map[string]map[string]*JobTemplate),
activeJobTemplatesById: make(map[string]*JobTemplate),
jobDb: jobDb,
nodeDbByPoolAndExecutorGroup: make(map[string][]*nodedb.NodeDb),
nodeDbByPool: make(map[string]*nodedb.NodeDb),
poolByNodeId: make(map[string]string),
nodeDbByExecutorName: make(map[string]*nodedb.NodeDb),
allocationByPoolAndQueueAndPriorityClass: make(map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]),
totalResourcesByPool: make(map[string]schedulerobjects.ResourceList),
limiter: rate.NewLimiter(
Expand Down Expand Up @@ -274,14 +271,14 @@ func (s *Simulator) setupClusters() error {
}
for executorIndex, executor := range executorGroup.Clusters {
executorName := fmt.Sprintf("%s-%d-%d", pool.Name, executorGroupIndex, executorIndex)
s.nodeDbByExecutorName[executorName] = nodeDb
for nodeTemplateIndex, nodeTemplate := range executor.NodeTemplates {
for i := 0; i < int(nodeTemplate.Number); i++ {
nodeId := fmt.Sprintf("%s-%d-%d-%d-%d", pool.Name, executorGroupIndex, executorIndex, nodeTemplateIndex, i)
node := &schedulerobjects.Node{
Id: nodeId,
Name: nodeId,
Executor: executorName,
Pool: pool.Name,
Taints: slices.Clone(nodeTemplate.Taints),
Labels: maps.Clone(nodeTemplate.Labels),
TotalResources: nodeTemplate.TotalResources.DeepCopy(),
Expand All @@ -300,7 +297,7 @@ func (s *Simulator) setupClusters() error {
}
}
}
s.nodeDbByPoolAndExecutorGroup[pool.Name] = append(s.nodeDbByPoolAndExecutorGroup[pool.Name], nodeDb)
s.nodeDbByPool[pool.Name] = nodeDb
totalResourcesForPool.Add(nodeDb.TotalResources())
}
s.totalResourcesByPool[pool.Name] = totalResourcesForPool
Expand Down Expand Up @@ -457,164 +454,156 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
defer txn.Abort()
demandByQueue := calculateDemandByQueue(txn.GetAll())
for _, pool := range s.ClusterSpec.Pools {
for i := range pool.ClusterGroups {
nodeDb := s.nodeDbByPoolAndExecutorGroup[pool.Name][i]
if err := nodeDb.Reset(); err != nil {
return err
}
totalResources := s.totalResourcesByPool[pool.Name]
fairnessCostProvider, err := fairness.NewDominantResourceFairness(
totalResources,
s.schedulingConfig,
)
if err != nil {
return err
nodeDb := s.nodeDbByPool[pool.Name]
if err := nodeDb.Reset(); err != nil {
return err
}
totalResources := s.totalResourcesByPool[pool.Name]
fairnessCostProvider, err := fairness.NewDominantResourceFairness(
totalResources,
s.schedulingConfig,
)
if err != nil {
return err
}
sctx := schedulercontext.NewSchedulingContext(
pool.Name,
fairnessCostProvider,
s.limiter,
totalResources,
)

sctx.Started = s.time
for _, queue := range s.WorkloadSpec.Queues {
demand, hasDemand := demandByQueue[queue.Name]
if !hasDemand {
// To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running.
continue
}
sctx := schedulercontext.NewSchedulingContext(
pool.Name,
fairnessCostProvider,
s.limiter,
totalResources,
)

sctx.Started = s.time
for _, queue := range s.WorkloadSpec.Queues {
demand, hasDemand := demandByQueue[queue.Name]
if !hasDemand {
// To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running.
continue
}
limiter, ok := s.limiterByQueue[queue.Name]
if !ok {
limiter = rate.NewLimiter(
rate.Limit(s.schedulingConfig.MaximumPerQueueSchedulingRate),
s.schedulingConfig.MaximumPerQueueSchedulingBurst,
)
limiter.SetBurstAt(s.time, s.schedulingConfig.MaximumPerQueueSchedulingBurst)
s.limiterByQueue[queue.Name] = limiter
}
err := sctx.AddQueueSchedulingContext(
queue.Name,
queue.Weight,
s.allocationByPoolAndQueueAndPriorityClass[pool.Name][queue.Name],
demand,
demand,
limiter,
limiter, ok := s.limiterByQueue[queue.Name]
if !ok {
limiter = rate.NewLimiter(
rate.Limit(s.schedulingConfig.MaximumPerQueueSchedulingRate),
s.schedulingConfig.MaximumPerQueueSchedulingBurst,
)
if err != nil {
return err
}
limiter.SetBurstAt(s.time, s.schedulingConfig.MaximumPerQueueSchedulingBurst)
s.limiterByQueue[queue.Name] = limiter
}
constraints := schedulerconstraints.NewSchedulingConstraints(pool.Name, totalResources, s.schedulingConfig, nil, map[string]bool{})

nloatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(s.schedulingConfig.ExperimentalFloatingResources)
err := sctx.AddQueueSchedulingContext(
queue.Name,
queue.Weight,
s.allocationByPoolAndQueueAndPriorityClass[pool.Name][queue.Name],
demand,
demand,
limiter,
)
if err != nil {
return err
}
}
constraints := schedulerconstraints.NewSchedulingConstraints(pool.Name, totalResources, s.schedulingConfig, nil, map[string]bool{})

sch := scheduler.NewPreemptingQueueScheduler(
sctx,
constraints,
nloatingResourceTypes,
s.schedulingConfig.ProtectedFractionOfFairShare,
scheduler.NewSchedulerJobRepositoryAdapter(txn),
nodeDb,
// TODO: Necessary to support partial eviction.
nil,
nil,
nil,
)
nloatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(s.schedulingConfig.ExperimentalFloatingResources)
if err != nil {
return err
}

schedulerCtx := ctx
if s.SuppressSchedulerLogs {
schedulerCtx = &armadacontext.Context{
Context: ctx.Context,
FieldLogger: logging.NullLogger,
}
}
result, err := sch.Schedule(schedulerCtx)
if err != nil {
return err
sch := scheduler.NewPreemptingQueueScheduler(
sctx,
constraints,
nloatingResourceTypes,
s.schedulingConfig.ProtectedFractionOfFairShare,
scheduler.NewSchedulerJobRepositoryAdapter(txn),
nodeDb,
// TODO: Necessary to support partial eviction.
nil,
nil,
nil,
)

schedulerCtx := ctx
if s.SuppressSchedulerLogs {
schedulerCtx = &armadacontext.Context{
Context: ctx.Context,
FieldLogger: logging.NullLogger,
}
}
result, err := sch.Schedule(schedulerCtx)
if err != nil {
return err
}

// Update jobDb to reflect the decisions by the scheduler.
// Sort jobs to ensure deterministic event ordering.
preemptedJobs := scheduler.PreemptedJobsFromSchedulerResult(result)
scheduledJobs := slices.Clone(result.ScheduledJobs)
lessJob := func(a, b *jobdb.Job) int {
if a.Queue() < b.Queue() {
return -1
} else if a.Queue() > b.Queue() {
return 1
}
if a.Id() < b.Id() {
return -1
} else if a.Id() > b.Id() {
return 1
}
return 0
// Update jobDb to reflect the decisions by the scheduler.
// Sort jobs to ensure deterministic event ordering.
preemptedJobs := scheduler.PreemptedJobsFromSchedulerResult(result)
scheduledJobs := slices.Clone(result.ScheduledJobs)
lessJob := func(a, b *jobdb.Job) int {
if a.Queue() < b.Queue() {
return -1
} else if a.Queue() > b.Queue() {
return 1
}
slices.SortFunc(preemptedJobs, lessJob)
slices.SortFunc(scheduledJobs, func(a, b *schedulercontext.JobSchedulingContext) int {
return lessJob(a.Job, b.Job)
})
for i, job := range preemptedJobs {
if run := job.LatestRun(); run != nil {
job = job.WithUpdatedRun(run.WithFailed(true))
} else {
return errors.Errorf("attempting to preempt job %s with no associated runs", job.Id())
}
preemptedJobs[i] = job.WithQueued(false).WithFailed(true)
if a.Id() < b.Id() {
return -1
} else if a.Id() > b.Id() {
return 1
}
for i, jctx := range scheduledJobs {
job := jctx.Job
nodeId := result.NodeIdByJobId[job.Id()]
if nodeId == "" {
return errors.Errorf("job %s not mapped to a node", job.Id())
}
if node, err := nodeDb.GetNode(nodeId); err != nil {
return err
} else {
priority, ok := nodeDb.GetScheduledAtPriority(job.Id())
if !ok {
return errors.Errorf("job %s not mapped to a priority", job.Id())
}
scheduledJobs[i].Job = job.WithQueued(false).WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), node.GetPool(), priority)
}
return 0
}
slices.SortFunc(preemptedJobs, lessJob)
slices.SortFunc(scheduledJobs, func(a, b *schedulercontext.JobSchedulingContext) int {
return lessJob(a.Job, b.Job)
})
for i, job := range preemptedJobs {
if run := job.LatestRun(); run != nil {
job = job.WithUpdatedRun(run.WithFailed(true))
} else {
return errors.Errorf("attempting to preempt job %s with no associated runs", job.Id())
}
if err := txn.Upsert(preemptedJobs); err != nil {
return err
preemptedJobs[i] = job.WithQueued(false).WithFailed(true)
}
for i, jctx := range scheduledJobs {
job := jctx.Job
nodeId := result.NodeIdByJobId[job.Id()]
if nodeId == "" {
return errors.Errorf("job %s not mapped to a node", job.Id())
}
if err := txn.Upsert(armadaslices.Map(scheduledJobs, func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job })); err != nil {
if node, err := nodeDb.GetNode(nodeId); err != nil {
return err
} else {
priority, ok := nodeDb.GetScheduledAtPriority(job.Id())
if !ok {
return errors.Errorf("job %s not mapped to a priority", job.Id())
}
scheduledJobs[i].Job = job.WithQueued(false).WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), node.GetPool(), priority)
}
}
if err := txn.Upsert(preemptedJobs); err != nil {
return err
}
if err := txn.Upsert(armadaslices.Map(scheduledJobs, func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job })); err != nil {
return err
}

// Update allocation.
s.allocationByPoolAndQueueAndPriorityClass[pool.Name] = sctx.AllocatedByQueueAndPriority()

// Generate eventSequences.
// TODO: Add time taken to run the scheduler to s.time.
eventSequences, err = scheduler.AppendEventSequencesFromPreemptedJobs(eventSequences, preemptedJobs, s.time)
if err != nil {
return err
}
eventSequences, err = scheduler.AppendEventSequencesFromScheduledJobs(eventSequences, scheduledJobs)
if err != nil {
return err
}
// Update allocation.
s.allocationByPoolAndQueueAndPriorityClass[pool.Name] = sctx.AllocatedByQueueAndPriority()

// Update event timestamps to be consistent with simulated time.
t := s.time
for _, eventSequence := range eventSequences {
for _, event := range eventSequence.Events {
event.Created = protoutil.ToTimestamp(t)
}
}
// Generate eventSequences.
// TODO: Add time taken to run the scheduler to s.time.
eventSequences, err = scheduler.AppendEventSequencesFromPreemptedJobs(eventSequences, preemptedJobs, s.time)
if err != nil {
return err
}
eventSequences, err = scheduler.AppendEventSequencesFromScheduledJobs(eventSequences, scheduledJobs)
if err != nil {
return err
}

// If nothing changed, we're in steady state and can safely skip scheduling until something external has changed.
// Do this only if a non-zero amount of time has passed.
if !s.time.Equal(time.Time{}) && len(result.ScheduledJobs) == 0 && len(result.PreemptedJobs) == 0 {
s.shouldSchedule = false
// Update event timestamps to be consistent with simulated time.
t := s.time
for _, eventSequence := range eventSequences {
for _, event := range eventSequence.Events {
event.Created = protoutil.ToTimestamp(t)
}
}
}
Expand Down Expand Up @@ -840,7 +829,7 @@ func (s *Simulator) unbindRunningJob(job *jobdb.Job) error {
if run.NodeId() == "" {
return errors.Errorf("empty nodeId for run %s of job %s", run.Id(), job.Id())
}
nodeDb := s.nodeDbByExecutorName[run.Executor()]
nodeDb := s.nodeDbByPool[run.Pool()]
node, err := nodeDb.GetNode(run.NodeId())
if err != nil {
return err
Expand Down

0 comments on commit a53adbb

Please sign in to comment.