From be65675f67bcf20b1651918dc038f02f54c0ee76 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 19 Sep 2024 12:01:58 +0100 Subject: [PATCH 1/7] wip Signed-off-by: Chris Martin --- internal/scheduler/jobiteration.go | 80 ++++++++---------------------- 1 file changed, 21 insertions(+), 59 deletions(-) diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index b814e9144a9..5197e698123 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -1,8 +1,6 @@ package scheduler import ( - "sync" - "golang.org/x/exp/slices" "github.com/armadaproject/armada/internal/common/armadacontext" @@ -12,68 +10,39 @@ import ( ) type JobIterator interface { - Next() (*schedulercontext.JobSchedulingContext, error) + Next() (*jobdb.Job, error) } type JobRepository interface { - GetQueueJobIds(queueName string) []string - GetExistingJobsByIds(ids []string) []*jobdb.Job -} - -type InMemoryJobIterator struct { - i int - jctxs []*schedulercontext.JobSchedulingContext -} - -func NewInMemoryJobIterator(jctxs []*schedulercontext.JobSchedulingContext) *InMemoryJobIterator { - return &InMemoryJobIterator{ - jctxs: jctxs, - } -} - -func (it *InMemoryJobIterator) Next() (*schedulercontext.JobSchedulingContext, error) { - if it.i >= len(it.jctxs) { - return nil, nil - } - v := it.jctxs[it.i] - it.i++ - return v, nil + GetById(id string) *jobdb.Job + JobsForQueue(queue string) JobIterator } type InMemoryJobRepository struct { jctxsByQueue map[string][]*schedulercontext.JobSchedulingContext jctxsById map[string]*schedulercontext.JobSchedulingContext - // Protects the above fields. - mu sync.Mutex } -func NewInMemoryJobRepository() *InMemoryJobRepository { - return &InMemoryJobRepository{ - jctxsByQueue: make(map[string][]*schedulercontext.JobSchedulingContext), - jctxsById: make(map[string]*schedulercontext.JobSchedulingContext), - } -} +func NewInMemoryJobRepository(jctxs []*schedulercontext.JobSchedulingContext) *InMemoryJobRepository { + + jctxsByQueue := make(map[string][]*schedulercontext.JobSchedulingContext) + jctxsById := make(map[string]*schedulercontext.JobSchedulingContext, len(jctxs)) -func (repo *InMemoryJobRepository) EnqueueMany(jctxs []*schedulercontext.JobSchedulingContext) { - repo.mu.Lock() - defer repo.mu.Unlock() - updatedQueues := make(map[string]bool) for _, jctx := range jctxs { queue := jctx.Job.Queue() - repo.jctxsByQueue[queue] = append(repo.jctxsByQueue[queue], jctx) - repo.jctxsById[jctx.Job.Id()] = jctx - updatedQueues[queue] = true + jctxsByQueue[queue] = append(jctxsByQueue[queue], jctx) + jctxsById[jctx.Job.Id()] = jctx } - for queue := range updatedQueues { - repo.sortQueue(queue) + for _, jobCtxs := range jctxsByQueue { + slices.SortFunc(jobCtxs, func(a, b *schedulercontext.JobSchedulingContext) int { + return a.Job.SchedulingOrderCompare(b.Job) + }) } -} -// sortQueue sorts jobs in a specified queue by the order in which they should be scheduled. -func (repo *InMemoryJobRepository) sortQueue(queue string) { - slices.SortFunc(repo.jctxsByQueue[queue], func(a, b *schedulercontext.JobSchedulingContext) int { - return a.Job.SchedulingOrderCompare(b.Job) - }) + return &InMemoryJobRepository{ + jctxsByQueue: jctxsByQueue, + jctxsById: jctxsById, + } } func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) []string { @@ -85,21 +54,14 @@ func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) []string { ) } -func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job { - repo.mu.Lock() - defer repo.mu.Unlock() - rv := make([]*jobdb.Job, 0, len(jobIds)) - for _, jobId := range jobIds { - if jctx, ok := repo.jctxsById[jobId]; ok { - rv = append(rv, jctx.Job) - } +func (repo *InMemoryJobRepository) GetById(jobId string) *jobdb.Job { + if jctx, ok := repo.jctxsById[jobId]; ok { + return jctx.Job } - return rv + return nil } func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator { - repo.mu.Lock() - defer repo.mu.Unlock() return NewInMemoryJobIterator(slices.Clone(repo.jctxsByQueue[queue])) } From bf0ec25647d6a50ee9a4d21ff8d80317c75854f8 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 19 Sep 2024 16:17:46 +0100 Subject: [PATCH 2/7] wip Signed-off-by: Chris Martin --- internal/scheduler/jobdb/jobdb.go | 6 +- internal/scheduler/jobiteration.go | 108 ++++++------------ .../scheduler/preempting_queue_scheduler.go | 28 ++--- internal/scheduler/queue_scheduler.go | 16 ++- internal/scheduler/scheduling_algo.go | 37 +----- 5 files changed, 62 insertions(+), 133 deletions(-) diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 8a65691390a..1d014792035 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -20,6 +20,10 @@ import ( "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) +type JobIterator interface { + Next() (*Job, bool) +} + var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{}) type JobDb struct { @@ -496,7 +500,7 @@ func (txn *Txn) HasQueuedJobs(queue string) bool { } // QueuedJobs returns true if the queue has any jobs in the running state or false otherwise -func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] { +func (txn *Txn) QueuedJobs(queue string) JobIterator { jobQueue, ok := txn.jobsByQueue[queue] if ok { return jobQueue.Iterator() diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index 5197e698123..7533a9de7eb 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -1,68 +1,54 @@ package scheduler import ( - "golang.org/x/exp/slices" - "github.com/armadaproject/armada/internal/common/armadacontext" - armadaslices "github.com/armadaproject/armada/internal/common/slices" - schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/jobdb" + "github.com/benbjohnson/immutable" ) -type JobIterator interface { - Next() (*jobdb.Job, error) -} - type JobRepository interface { GetById(id string) *jobdb.Job - JobsForQueue(queue string) JobIterator + QueuedJobs(queue string) jobdb.JobIterator } type InMemoryJobRepository struct { - jctxsByQueue map[string][]*schedulercontext.JobSchedulingContext - jctxsById map[string]*schedulercontext.JobSchedulingContext + jobsByQueue map[string]immutable.SortedSet[*jobdb.Job] + jobsById map[string]*jobdb.Job } -func NewInMemoryJobRepository(jctxs []*schedulercontext.JobSchedulingContext) *InMemoryJobRepository { +func NewInMemoryJobRepository(jobs []*jobdb.Job) *InMemoryJobRepository { - jctxsByQueue := make(map[string][]*schedulercontext.JobSchedulingContext) - jctxsById := make(map[string]*schedulercontext.JobSchedulingContext, len(jctxs)) + jobsByQueue := make(map[string]immutable.SortedSet[*jobdb.Job]) + jobsById := make(map[string]*jobdb.Job, len(jobs)) - for _, jctx := range jctxs { - queue := jctx.Job.Queue() - jctxsByQueue[queue] = append(jctxsByQueue[queue], jctx) - jctxsById[jctx.Job.Id()] = jctx - } - for _, jobCtxs := range jctxsByQueue { - slices.SortFunc(jobCtxs, func(a, b *schedulercontext.JobSchedulingContext) int { - return a.Job.SchedulingOrderCompare(b.Job) - }) + for _, job := range jobs { + queueName := job.Queue() + queue, ok := jobsByQueue[queueName] + if !ok { + queue = immutable.NewSortedSet[*jobdb.Job](jobdb.JobPriorityComparer{}) + } + jobsByQueue[queueName] = queue.Add(job) + jobsById[job.Id()] = job } - return &InMemoryJobRepository{ - jctxsByQueue: jctxsByQueue, - jctxsById: jctxsById, + jobsByQueue: jobsByQueue, + jobsById: jobsById, } } -func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) []string { - return armadaslices.Map( - repo.jctxsByQueue[queue], - func(jctx *schedulercontext.JobSchedulingContext) string { - return jctx.Job.Id() - }, - ) -} - func (repo *InMemoryJobRepository) GetById(jobId string) *jobdb.Job { - if jctx, ok := repo.jctxsById[jobId]; ok { - return jctx.Job + if job, ok := repo.jobsById[jobId]; ok { + return job } return nil } -func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator { - return NewInMemoryJobIterator(slices.Clone(repo.jctxsByQueue[queue])) +func (repo *InMemoryJobRepository) QueuedJobs(queueName string) jobdb.JobIterator { + queue, ok := repo.jobsByQueue[queueName] + if ok { + return queue.Iterator() + } + return nil } // QueuedJobsIterator is an iterator over all jobs in a queue. @@ -73,52 +59,26 @@ type QueuedJobsIterator struct { ctx *armadacontext.Context } -func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator { - return &QueuedJobsIterator{ - jobIds: repo.GetQueueJobIds(queue), - repo: repo, - ctx: ctx, - } -} - -func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, error) { - select { - case <-it.ctx.Done(): - return nil, it.ctx.Err() - default: - if it.idx >= len(it.jobIds) { - return nil, nil - } - job := it.repo.GetExistingJobsByIds([]string{it.jobIds[it.idx]}) - it.idx++ - return schedulercontext.JobSchedulingContextFromJob(job[0]), nil - } -} - // MultiJobsIterator chains several JobIterators together in the order provided. type MultiJobsIterator struct { i int - its []JobIterator + its []jobdb.JobIterator } -func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator { +func NewMultiJobsIterator(its ...jobdb.JobIterator) *MultiJobsIterator { return &MultiJobsIterator{ its: its, } } -func (it *MultiJobsIterator) Next() (*schedulercontext.JobSchedulingContext, error) { +func (it *MultiJobsIterator) Next() (*jobdb.Job, bool) { if it.i >= len(it.its) { - return nil, nil - } - v, err := it.its[it.i].Next() - if err != nil { - return nil, err + return nil, false } - if v == nil { - it.i++ - return it.Next() - } else { - return v, err + v, ok := it.its[it.i].Next() + if ok { + return v, true } + it.i++ + return it.Next() } diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 8b4fed9225e..01969351ff6 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -3,7 +3,6 @@ package scheduler import ( "fmt" "math" - "reflect" "time" "github.com/hashicorp/go-memdb" @@ -263,7 +262,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor *Evictor) (*EvictorResult, *InMemoryJobRepository, error) { if evictor == nil { - return &EvictorResult{}, NewInMemoryJobRepository(), nil + return &EvictorResult{}, NewInMemoryJobRepository(nil), nil } txn := sch.nodeDb.Txn(true) defer txn.Abort() @@ -295,10 +294,12 @@ func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor * sch.setEvictedGangCardinality(result) evictedJctxs := maps.Values(result.EvictedJctxsByJobId) - for _, jctx := range evictedJctxs { + evictedJobs := make([]*jobdb.Job, len(evictedJctxs)) + for i, jctx := range evictedJctxs { if _, err := sch.schedulingContext.EvictJob(jctx.Job); err != nil { return nil, nil, err } + evictedJobs[i] = jctx.Job } // TODO: Move gang accounting into context. if err := sch.updateGangAccounting(evictedJctxs, nil); err != nil { @@ -307,8 +308,7 @@ func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor * if err := sch.evictionAssertions(result); err != nil { return nil, nil, err } - inMemoryJobRepo := NewInMemoryJobRepository() - inMemoryJobRepo.EnqueueMany(evictedJctxs) + inMemoryJobRepo := NewInMemoryJobRepository(evictedJobs) txn.Commit() if err := sch.nodeDb.Reset(); err != nil { @@ -488,7 +488,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch for _, qctx := range sctx.QueueSchedulingContexts { gangItByQueue[qctx.Queue] = NewQueuedGangIterator( sctx, - inMemoryJobRepo.GetJobIterator(qctx.Queue), + inMemoryJobRepo.QueuedJobs(qctx.Queue), 0, false, ) @@ -525,13 +525,13 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch } func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) { - jobIteratorByQueue := make(map[string]JobIterator) + jobIteratorByQueue := make(map[string]jobdb.JobIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { - evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue) - if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { + evictedIt := inMemoryJobRepo.QueuedJobs(qctx.Queue) + if jobRepo == nil { jobIteratorByQueue[qctx.Queue] = evictedIt } else { - queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, jobRepo) + queueIt := jobRepo.QueuedJobs(qctx.Queue) jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt) } } @@ -821,13 +821,15 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) { continue } - jobIds := make([]string, 0, len(node.AllocatedByJobId)) + jobs := make([]*jobdb.Job, 0, len(node.AllocatedByJobId)) for jobId := range node.AllocatedByJobId { if _, ok := node.EvictedJobRunIds[jobId]; !ok { - jobIds = append(jobIds, jobId) + job := evi.jobRepo.GetById(jobId) + if job != nil { + jobs = append(jobs, job) + } } } - jobs := evi.jobRepo.GetExistingJobsByIds(jobIds) evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(jobFilter, jobs, node) if err != nil { return nil, err diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index b2e9d2f8916..68052fbeb93 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -3,7 +3,7 @@ package scheduler import ( "container/heap" "fmt" - "reflect" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "time" "github.com/pkg/errors" @@ -32,7 +32,7 @@ func NewQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, - jobIteratorByQueue map[string]JobIterator, + jobIteratorByQueue map[string]jobdb.JobIterator, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { if _, ok := sctx.QueueSchedulingContexts[queue]; !ok { @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { schedulingContext *schedulercontext.SchedulingContext - queuedJobsIterator JobIterator + queuedJobsIterator jobdb.JobIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext // Maximum number of jobs to look at before giving up. @@ -231,7 +231,7 @@ type QueuedGangIterator struct { next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it jobdb.JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { return &QueuedGangIterator{ schedulingContext: sctx, queuedJobsIterator: it, @@ -269,15 +269,13 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e // 1. get a job that isn't part of a gang, in which case we yield it immediately, or // 2. get the final job in a gang, in which case we yield the entire gang. for { - jctx, err := it.queuedJobsIterator.Next() - if err != nil { - return nil, err - } else if jctx == nil || reflect.ValueOf(jctx).IsNil() { + job, _ := it.queuedJobsIterator.Next() + if job == nil { return nil, nil } // Queue lookback limits. Rescheduled jobs don't count towards the limit. - if !jctx.IsEvicted { + if !job.IsEvicted { it.jobsSeen++ } if it.hitLookbackLimit() { diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index a463b54a83e..3f4bc6d8ee0 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -488,7 +488,7 @@ func (l *FairSchedulingAlgo) schedulePool( constraints, l.floatingResourceTypes, l.schedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(fsctx.txn), + fsctx.txn, nodeDb, fsctx.nodeIdByJobId, fsctx.jobIdsByGangId, @@ -537,41 +537,6 @@ func (l *FairSchedulingAlgo) schedulePool( return result, sctx, nil } -// SchedulerJobRepositoryAdapter allows jobDb implement the JobRepository interface. -// TODO: Pass JobDb into the scheduler instead of using this shim to convert to a JobRepo. -type SchedulerJobRepositoryAdapter struct { - txn *jobdb.Txn -} - -func NewSchedulerJobRepositoryAdapter(txn *jobdb.Txn) *SchedulerJobRepositoryAdapter { - return &SchedulerJobRepositoryAdapter{ - txn: txn, - } -} - -// GetQueueJobIds is necessary to implement the JobRepository interface, which we need while transitioning from the old -// to new scheduler. -func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) []string { - rv := make([]string, 0) - it := repo.txn.QueuedJobs(queue) - for v, _ := it.Next(); v != nil; v, _ = it.Next() { - rv = append(rv, v.Id()) - } - return rv -} - -// GetExistingJobsByIds is necessary to implement the JobRepository interface which we need while transitioning from the -// old to new scheduler. -func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) []*jobdb.Job { - rv := make([]*jobdb.Job, 0, len(ids)) - for _, id := range ids { - if job := repo.txn.GetById(id); job != nil { - rv = append(rv, job) - } - } - return rv -} - // populateNodeDb adds all the nodes and jobs associated with a particular pool to the nodeDb. func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, jobs []*jobdb.Job, nodes []*schedulerobjects.Node) error { txn := nodeDb.Txn(true) From c6006b8fd582669f9804d8d45b190799e1f81a22 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 19 Sep 2024 17:18:42 +0100 Subject: [PATCH 3/7] wip Signed-off-by: Chris Martin --- .../scheduler/jobiteration/job_context.go | 22 ++++++++++++ .../job_repository.go} | 35 +------------------ .../scheduler/jobiteration/multi_iterator.go | 29 +++++++++++++++ 3 files changed, 52 insertions(+), 34 deletions(-) create mode 100644 internal/scheduler/jobiteration/job_context.go rename internal/scheduler/{jobiteration.go => jobiteration/job_repository.go} (62%) create mode 100644 internal/scheduler/jobiteration/multi_iterator.go diff --git a/internal/scheduler/jobiteration/job_context.go b/internal/scheduler/jobiteration/job_context.go new file mode 100644 index 00000000000..175bd3f95f5 --- /dev/null +++ b/internal/scheduler/jobiteration/job_context.go @@ -0,0 +1,22 @@ +package jobiteration + +import ( + "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/jobdb" +) + +type JobContextIterator interface { + Next() (*context.JobSchedulingContext, bool) +} + +type fish struct { + jobIterator jobdb.JobIterator +} + +func (f *fish) Next() (*context.JobSchedulingContext, bool) { + job, ok := f.jobIterator.Next() + if !ok { + return nil, false + } + return context.JobSchedulingContextFromJob(job), true +} diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration/job_repository.go similarity index 62% rename from internal/scheduler/jobiteration.go rename to internal/scheduler/jobiteration/job_repository.go index 7533a9de7eb..8b03a65beaf 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration/job_repository.go @@ -1,7 +1,6 @@ -package scheduler +package jobiteration import ( - "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/benbjohnson/immutable" ) @@ -50,35 +49,3 @@ func (repo *InMemoryJobRepository) QueuedJobs(queueName string) jobdb.JobIterato } return nil } - -// QueuedJobsIterator is an iterator over all jobs in a queue. -type QueuedJobsIterator struct { - repo JobRepository - jobIds []string - idx int - ctx *armadacontext.Context -} - -// MultiJobsIterator chains several JobIterators together in the order provided. -type MultiJobsIterator struct { - i int - its []jobdb.JobIterator -} - -func NewMultiJobsIterator(its ...jobdb.JobIterator) *MultiJobsIterator { - return &MultiJobsIterator{ - its: its, - } -} - -func (it *MultiJobsIterator) Next() (*jobdb.Job, bool) { - if it.i >= len(it.its) { - return nil, false - } - v, ok := it.its[it.i].Next() - if ok { - return v, true - } - it.i++ - return it.Next() -} diff --git a/internal/scheduler/jobiteration/multi_iterator.go b/internal/scheduler/jobiteration/multi_iterator.go new file mode 100644 index 00000000000..2e4a4037f20 --- /dev/null +++ b/internal/scheduler/jobiteration/multi_iterator.go @@ -0,0 +1,29 @@ +package jobiteration + +import ( + "github.com/armadaproject/armada/internal/scheduler/context" +) + +// MultiJobsIterator chains several JobIterators together in the order provided. +type MultiJobsIterator struct { + i int + its []JobContextIterator +} + +func NewMultiJobsIterator(its ...JobContextIterator) *MultiJobsIterator { + return &MultiJobsIterator{ + its: its, + } +} + +func (it *MultiJobsIterator) Next() (*context.JobSchedulingContext, bool) { + if it.i >= len(it.its) { + return nil, false + } + v, ok := it.its[it.i].Next() + if ok { + return v, true + } + it.i++ + return it.Next() +} From acbdfe56dbf48ab188b96691edbb0387ab96a270 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 20 Sep 2024 09:26:11 +0100 Subject: [PATCH 4/7] wip Signed-off-by: Chris Martin --- internal/common/xiter/xiter.go | 31 +++++++++++ internal/scheduler/jobiteration/adapter.go | 19 +++++++ internal/scheduler/jobiteration/interfaces.go | 19 +++++++ .../scheduler/jobiteration/job_context.go | 22 -------- .../jobiteration/job_context_repository.go | 45 ++++++++++++++++ .../scheduler/jobiteration/job_repository.go | 51 ------------------- .../scheduler/jobiteration/multi_iterator.go | 29 ----------- .../scheduler/preempting_queue_scheduler.go | 45 ++++++++-------- internal/scheduler/queue_scheduler.go | 17 +++---- 9 files changed, 143 insertions(+), 135 deletions(-) create mode 100644 internal/common/xiter/xiter.go create mode 100644 internal/scheduler/jobiteration/adapter.go create mode 100644 internal/scheduler/jobiteration/interfaces.go delete mode 100644 internal/scheduler/jobiteration/job_context.go create mode 100644 internal/scheduler/jobiteration/job_context_repository.go delete mode 100644 internal/scheduler/jobiteration/job_repository.go delete mode 100644 internal/scheduler/jobiteration/multi_iterator.go diff --git a/internal/common/xiter/xiter.go b/internal/common/xiter/xiter.go new file mode 100644 index 00000000000..8659e225e9f --- /dev/null +++ b/internal/common/xiter/xiter.go @@ -0,0 +1,31 @@ +package xiter + +import "iter" + +// Map returns an iterator over f applied to seq. +func Map[In, Out any](f func(In) Out, seq iter.Seq[In]) iter.Seq[Out] { + return func(yield func(Out) bool) { + for in := range seq { + if !yield(f(in)) { + return + } + } + } +} + +// Concat returns an iterator over the concatenation of the sequences. +func Concat[V any](seqs ...iter.Seq[V]) iter.Seq[V] { + return func(yield func(V) bool) { + for _, seq := range seqs { + for e := range seq { + if !yield(e) { + return + } + } + } + } +} + +func Empty[T any]() iter.Seq[T] { + return func(yield func(T) bool) {} +} diff --git a/internal/scheduler/jobiteration/adapter.go b/internal/scheduler/jobiteration/adapter.go new file mode 100644 index 00000000000..f6bb4555bb8 --- /dev/null +++ b/internal/scheduler/jobiteration/adapter.go @@ -0,0 +1,19 @@ +package jobiteration + +import ( + "iter" + + "github.com/armadaproject/armada/internal/common/xiter" + "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/jobdb" +) + +type JobContextRepositoryAdapter struct { + JobRepository +} + +func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) iter.Seq[*context.JobSchedulingContext] { + return xiter.Map(func(j *jobdb.Job) *context.JobSchedulingContext { + return context.JobSchedulingContextFromJob(j) + }, j.JobRepository.GetJobsForQueue(queue)) +} diff --git a/internal/scheduler/jobiteration/interfaces.go b/internal/scheduler/jobiteration/interfaces.go new file mode 100644 index 00000000000..e6d562e6765 --- /dev/null +++ b/internal/scheduler/jobiteration/interfaces.go @@ -0,0 +1,19 @@ +package jobiteration + +import ( + "iter" + + "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/jobdb" +) + +// JobRepository is a source of jobs +type JobRepository interface { + GetById(id string) *jobdb.Job + GetJobsForQueue(queue string) iter.Seq[*jobdb.Job] +} + +// JobContextRepository is a source of job contexts +type JobContextRepository interface { + GetJobContextsForQueue(queue string) iter.Seq[*context.JobSchedulingContext] +} diff --git a/internal/scheduler/jobiteration/job_context.go b/internal/scheduler/jobiteration/job_context.go deleted file mode 100644 index 175bd3f95f5..00000000000 --- a/internal/scheduler/jobiteration/job_context.go +++ /dev/null @@ -1,22 +0,0 @@ -package jobiteration - -import ( - "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/jobdb" -) - -type JobContextIterator interface { - Next() (*context.JobSchedulingContext, bool) -} - -type fish struct { - jobIterator jobdb.JobIterator -} - -func (f *fish) Next() (*context.JobSchedulingContext, bool) { - job, ok := f.jobIterator.Next() - if !ok { - return nil, false - } - return context.JobSchedulingContextFromJob(job), true -} diff --git a/internal/scheduler/jobiteration/job_context_repository.go b/internal/scheduler/jobiteration/job_context_repository.go new file mode 100644 index 00000000000..9fb192924a3 --- /dev/null +++ b/internal/scheduler/jobiteration/job_context_repository.go @@ -0,0 +1,45 @@ +package jobiteration + +import ( + "github.com/armadaproject/armada/internal/common/xiter" + "github.com/armadaproject/armada/internal/scheduler/context" + "iter" + "slices" +) + +// InMemoryJobContextRepository is JobContextRepository that can be created from a slice of JobSchedulingContexts +type InMemoryJobContextRepository struct { + jobContextsByQueue map[string][]*context.JobSchedulingContext +} + +func NewInMemoryJobContextRepository(jobContexts []*context.JobSchedulingContext) *InMemoryJobContextRepository { + + jobContextsByQueue := make(map[string][]*context.JobSchedulingContext) + + for _, jobCtx := range jobContexts { + queue, ok := jobContextsByQueue[jobCtx.Job.Queue()] + if !ok { + queue = []*context.JobSchedulingContext{} + } + jobContextsByQueue[jobCtx.Job.Queue()] = append(queue, jobCtx) + } + + // Sort jobs by the order in which they should be scheduled. + for _, queue := range jobContextsByQueue { + slices.SortFunc(queue, func(a, b *context.JobSchedulingContext) int { + return a.Job.SchedulingOrderCompare(b.Job) + }) + } + + return &InMemoryJobContextRepository{ + jobContextsByQueue: jobContextsByQueue, + } +} + +func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName string) iter.Seq[*context.JobSchedulingContext] { + queue, ok := repo.jobContextsByQueue[queueName] + if ok { + return slices.Values(queue) + } + return xiter.Empty[*context.JobSchedulingContext]() +} diff --git a/internal/scheduler/jobiteration/job_repository.go b/internal/scheduler/jobiteration/job_repository.go deleted file mode 100644 index 8b03a65beaf..00000000000 --- a/internal/scheduler/jobiteration/job_repository.go +++ /dev/null @@ -1,51 +0,0 @@ -package jobiteration - -import ( - "github.com/armadaproject/armada/internal/scheduler/jobdb" - "github.com/benbjohnson/immutable" -) - -type JobRepository interface { - GetById(id string) *jobdb.Job - QueuedJobs(queue string) jobdb.JobIterator -} - -type InMemoryJobRepository struct { - jobsByQueue map[string]immutable.SortedSet[*jobdb.Job] - jobsById map[string]*jobdb.Job -} - -func NewInMemoryJobRepository(jobs []*jobdb.Job) *InMemoryJobRepository { - - jobsByQueue := make(map[string]immutable.SortedSet[*jobdb.Job]) - jobsById := make(map[string]*jobdb.Job, len(jobs)) - - for _, job := range jobs { - queueName := job.Queue() - queue, ok := jobsByQueue[queueName] - if !ok { - queue = immutable.NewSortedSet[*jobdb.Job](jobdb.JobPriorityComparer{}) - } - jobsByQueue[queueName] = queue.Add(job) - jobsById[job.Id()] = job - } - return &InMemoryJobRepository{ - jobsByQueue: jobsByQueue, - jobsById: jobsById, - } -} - -func (repo *InMemoryJobRepository) GetById(jobId string) *jobdb.Job { - if job, ok := repo.jobsById[jobId]; ok { - return job - } - return nil -} - -func (repo *InMemoryJobRepository) QueuedJobs(queueName string) jobdb.JobIterator { - queue, ok := repo.jobsByQueue[queueName] - if ok { - return queue.Iterator() - } - return nil -} diff --git a/internal/scheduler/jobiteration/multi_iterator.go b/internal/scheduler/jobiteration/multi_iterator.go deleted file mode 100644 index 2e4a4037f20..00000000000 --- a/internal/scheduler/jobiteration/multi_iterator.go +++ /dev/null @@ -1,29 +0,0 @@ -package jobiteration - -import ( - "github.com/armadaproject/armada/internal/scheduler/context" -) - -// MultiJobsIterator chains several JobIterators together in the order provided. -type MultiJobsIterator struct { - i int - its []JobContextIterator -} - -func NewMultiJobsIterator(its ...JobContextIterator) *MultiJobsIterator { - return &MultiJobsIterator{ - its: its, - } -} - -func (it *MultiJobsIterator) Next() (*context.JobSchedulingContext, bool) { - if it.i >= len(it.its) { - return nil, false - } - v, ok := it.its[it.i].Next() - if ok { - return v, true - } - it.i++ - return it.Next() -} diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 01969351ff6..0d6a1d2bf46 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -2,6 +2,9 @@ package scheduler import ( "fmt" + "github.com/armadaproject/armada/internal/common/xiter" + "github.com/armadaproject/armada/internal/scheduler/jobiteration" + "iter" "math" "time" @@ -31,7 +34,7 @@ type PreemptingQueueScheduler struct { constraints schedulerconstraints.SchedulingConstraints floatingResourceTypes *floatingresources.FloatingResourceTypes protectedFractionOfFairShare float64 - jobRepo JobRepository + jobRepo jobiteration.JobRepository nodeDb *nodedb.NodeDb // Maps job ids to the id of the node the job is associated with. // For scheduled or running jobs, that is the node the job is assigned to. @@ -52,7 +55,7 @@ func NewPreemptingQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, protectedFractionOfFairShare float64, - jobRepo JobRepository, + jobRepo jobiteration.JobRepository, nodeDb *nodedb.NodeDb, initialNodeIdByJobId map[string]string, initialJobIdsByGangId map[string]map[string]bool, @@ -153,7 +156,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche schedulerResult, err := sch.schedule( armadacontext.WithLogField(ctx, "stage", "re-schedule after balancing eviction"), inMemoryJobRepo, - sch.jobRepo, + &jobiteration.JobContextRepositoryAdapter{JobRepository: sch.jobRepo}, ) if err != nil { return nil, err @@ -260,9 +263,9 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche }, nil } -func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor *Evictor) (*EvictorResult, *InMemoryJobRepository, error) { +func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor *Evictor) (*EvictorResult, *jobiteration.InMemoryJobContextRepository, error) { if evictor == nil { - return &EvictorResult{}, NewInMemoryJobRepository(nil), nil + return &EvictorResult{}, jobiteration.NewInMemoryJobContextRepository(nil), nil } txn := sch.nodeDb.Txn(true) defer txn.Abort() @@ -294,12 +297,10 @@ func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor * sch.setEvictedGangCardinality(result) evictedJctxs := maps.Values(result.EvictedJctxsByJobId) - evictedJobs := make([]*jobdb.Job, len(evictedJctxs)) - for i, jctx := range evictedJctxs { + for _, jctx := range evictedJctxs { if _, err := sch.schedulingContext.EvictJob(jctx.Job); err != nil { return nil, nil, err } - evictedJobs[i] = jctx.Job } // TODO: Move gang accounting into context. if err := sch.updateGangAccounting(evictedJctxs, nil); err != nil { @@ -308,7 +309,7 @@ func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor * if err := sch.evictionAssertions(result); err != nil { return nil, nil, err } - inMemoryJobRepo := NewInMemoryJobRepository(evictedJobs) + inMemoryJobRepo := jobiteration.NewInMemoryJobContextRepository(evictedJctxs) txn.Commit() if err := sch.nodeDb.Reset(); err != nil { @@ -483,12 +484,12 @@ func (q MinimalQueue) GetWeight() float64 { // addEvictedJobsToNodeDb adds evicted jobs to the NodeDb. // Needed to enable the nodeDb accounting for these when preempting. -func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.SchedulingContext, nodeDb *nodedb.NodeDb, inMemoryJobRepo *InMemoryJobRepository) error { +func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.SchedulingContext, nodeDb *nodedb.NodeDb, inMemoryJobRepo *jobiteration.InMemoryJobContextRepository) error { gangItByQueue := make(map[string]*QueuedGangIterator) for _, qctx := range sctx.QueueSchedulingContexts { gangItByQueue[qctx.Queue] = NewQueuedGangIterator( sctx, - inMemoryJobRepo.QueuedJobs(qctx.Queue), + inMemoryJobRepo.GetJobContextsForQueue(qctx.Queue), 0, false, ) @@ -524,16 +525,14 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch return nil } -func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) { - jobIteratorByQueue := make(map[string]jobdb.JobIterator) +func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, jobRepos ...jobiteration.JobContextRepository) (*schedulerresult.SchedulerResult, error) { + jobIteratorByQueue := make(map[string]iter.Seq[*schedulercontext.JobSchedulingContext]) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { - evictedIt := inMemoryJobRepo.QueuedJobs(qctx.Queue) - if jobRepo == nil { - jobIteratorByQueue[qctx.Queue] = evictedIt - } else { - queueIt := jobRepo.QueuedJobs(qctx.Queue) - jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt) + iters := make([]iter.Seq[*schedulercontext.JobSchedulingContext], len(jobRepos)) + for i, repo := range jobRepos { + iters[i] = repo.GetJobContextsForQueue(qctx.Queue) } + jobIteratorByQueue[qctx.Queue] = xiter.Concat(iters...) } // Reset the scheduling keys cache after evicting jobs. @@ -685,7 +684,7 @@ func (sch *PreemptingQueueScheduler) assertions( } type Evictor struct { - jobRepo JobRepository + jobRepo jobiteration.JobRepository nodeDb *nodedb.NodeDb nodeFilter func(*armadacontext.Context, *internaltypes.Node) bool jobFilter func(*armadacontext.Context, *jobdb.Job) bool @@ -719,7 +718,7 @@ func (er *EvictorResult) SummaryString() string { } func NewNodeEvictor( - jobRepo JobRepository, + jobRepo jobiteration.JobRepository, nodeDb *nodedb.NodeDb, jobFilter func(*armadacontext.Context, *jobdb.Job) bool, ) *Evictor { @@ -736,7 +735,7 @@ func NewNodeEvictor( // NewFilteredEvictor returns a new evictor that evicts all jobs for which jobIdsToEvict[jobId] is true // on nodes for which nodeIdsToEvict[nodeId] is true. func NewFilteredEvictor( - jobRepo JobRepository, + jobRepo jobiteration.JobRepository, nodeDb *nodedb.NodeDb, nodeIdsToEvict map[string]bool, jobIdsToEvict map[string]bool, @@ -761,7 +760,7 @@ func NewFilteredEvictor( // NewOversubscribedEvictor returns a new evictor that // for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled func NewOversubscribedEvictor( - jobRepo JobRepository, + jobRepo jobiteration.JobRepository, nodeDb *nodedb.NodeDb, ) *Evictor { // Populating overSubscribedPriorities relies on diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 68052fbeb93..a545dd1fcfa 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -3,7 +3,7 @@ package scheduler import ( "container/heap" "fmt" - "github.com/armadaproject/armada/internal/scheduler/jobdb" + "iter" "time" "github.com/pkg/errors" @@ -32,7 +32,7 @@ func NewQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, - jobIteratorByQueue map[string]jobdb.JobIterator, + jobIteratorByQueue map[string]iter.Seq[*schedulercontext.JobSchedulingContext], ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { if _, ok := sctx.QueueSchedulingContexts[queue]; !ok { @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { schedulingContext *schedulercontext.SchedulingContext - queuedJobsIterator jobdb.JobIterator + queuedJobsIterator iter.Seq[*schedulercontext.JobSchedulingContext] // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext // Maximum number of jobs to look at before giving up. @@ -231,7 +231,7 @@ type QueuedGangIterator struct { next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it jobdb.JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it iter.Seq[*schedulercontext.JobSchedulingContext], maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { return &QueuedGangIterator{ schedulingContext: sctx, queuedJobsIterator: it, @@ -268,14 +268,10 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e // Get one job at a time from the underlying iterator until we either // 1. get a job that isn't part of a gang, in which case we yield it immediately, or // 2. get the final job in a gang, in which case we yield the entire gang. - for { - job, _ := it.queuedJobsIterator.Next() - if job == nil { - return nil, nil - } + for jctx := range it.queuedJobsIterator { // Queue lookback limits. Rescheduled jobs don't count towards the limit. - if !job.IsEvicted { + if !jctx.IsEvicted { it.jobsSeen++ } if it.hitLookbackLimit() { @@ -315,6 +311,7 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e return it.next, nil } } + return nil, nil } func (it *QueuedGangIterator) hitLookbackLimit() bool { From c1e17c9e52896f13634f7da1ff2c47f28d8e7010 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 20 Sep 2024 09:47:24 +0100 Subject: [PATCH 5/7] wip Signed-off-by: Chris Martin --- internal/scheduler/jobiteration/adapter.go | 4 +--- internal/scheduler/jobiteration/interfaces.go | 9 +++++++-- .../scheduler/jobiteration/job_context_repository.go | 3 +-- internal/scheduler/preempting_queue_scheduler.go | 5 ++--- internal/scheduler/queue_scheduler.go | 8 ++++---- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/internal/scheduler/jobiteration/adapter.go b/internal/scheduler/jobiteration/adapter.go index f6bb4555bb8..50ffd198be7 100644 --- a/internal/scheduler/jobiteration/adapter.go +++ b/internal/scheduler/jobiteration/adapter.go @@ -1,8 +1,6 @@ package jobiteration import ( - "iter" - "github.com/armadaproject/armada/internal/common/xiter" "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -12,7 +10,7 @@ type JobContextRepositoryAdapter struct { JobRepository } -func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) iter.Seq[*context.JobSchedulingContext] { +func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) JobContextIterator { return xiter.Map(func(j *jobdb.Job) *context.JobSchedulingContext { return context.JobSchedulingContextFromJob(j) }, j.JobRepository.GetJobsForQueue(queue)) diff --git a/internal/scheduler/jobiteration/interfaces.go b/internal/scheduler/jobiteration/interfaces.go index e6d562e6765..8d12b034df9 100644 --- a/internal/scheduler/jobiteration/interfaces.go +++ b/internal/scheduler/jobiteration/interfaces.go @@ -7,13 +7,18 @@ import ( "github.com/armadaproject/armada/internal/scheduler/jobdb" ) +type ( + JobIterator = iter.Seq[*jobdb.Job] + JobContextIterator = iter.Seq[*context.JobSchedulingContext] +) + // JobRepository is a source of jobs type JobRepository interface { GetById(id string) *jobdb.Job - GetJobsForQueue(queue string) iter.Seq[*jobdb.Job] + GetJobsForQueue(queue string) JobIterator } // JobContextRepository is a source of job contexts type JobContextRepository interface { - GetJobContextsForQueue(queue string) iter.Seq[*context.JobSchedulingContext] + GetJobContextsForQueue(queue string) JobContextIterator } diff --git a/internal/scheduler/jobiteration/job_context_repository.go b/internal/scheduler/jobiteration/job_context_repository.go index 9fb192924a3..0c31a563a0e 100644 --- a/internal/scheduler/jobiteration/job_context_repository.go +++ b/internal/scheduler/jobiteration/job_context_repository.go @@ -3,7 +3,6 @@ package jobiteration import ( "github.com/armadaproject/armada/internal/common/xiter" "github.com/armadaproject/armada/internal/scheduler/context" - "iter" "slices" ) @@ -36,7 +35,7 @@ func NewInMemoryJobContextRepository(jobContexts []*context.JobSchedulingContext } } -func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName string) iter.Seq[*context.JobSchedulingContext] { +func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName string) JobContextIterator { queue, ok := repo.jobContextsByQueue[queueName] if ok { return slices.Values(queue) diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 0d6a1d2bf46..5ca0d370324 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/armadaproject/armada/internal/common/xiter" "github.com/armadaproject/armada/internal/scheduler/jobiteration" - "iter" "math" "time" @@ -526,9 +525,9 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch } func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, jobRepos ...jobiteration.JobContextRepository) (*schedulerresult.SchedulerResult, error) { - jobIteratorByQueue := make(map[string]iter.Seq[*schedulercontext.JobSchedulingContext]) + jobIteratorByQueue := make(map[string]jobiteration.JobContextIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { - iters := make([]iter.Seq[*schedulercontext.JobSchedulingContext], len(jobRepos)) + iters := make([]jobiteration.JobContextIterator, len(jobRepos)) for i, repo := range jobRepos { iters[i] = repo.GetJobContextsForQueue(qctx.Queue) } diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index a545dd1fcfa..b308985bc44 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -3,7 +3,7 @@ package scheduler import ( "container/heap" "fmt" - "iter" + "github.com/armadaproject/armada/internal/scheduler/jobiteration" "time" "github.com/pkg/errors" @@ -32,7 +32,7 @@ func NewQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, - jobIteratorByQueue map[string]iter.Seq[*schedulercontext.JobSchedulingContext], + jobIteratorByQueue map[string]jobiteration.JobContextIterator, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { if _, ok := sctx.QueueSchedulingContexts[queue]; !ok { @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { schedulingContext *schedulercontext.SchedulingContext - queuedJobsIterator iter.Seq[*schedulercontext.JobSchedulingContext] + queuedJobsIterator jobiteration.JobContextIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext // Maximum number of jobs to look at before giving up. @@ -231,7 +231,7 @@ type QueuedGangIterator struct { next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it iter.Seq[*schedulercontext.JobSchedulingContext], maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it jobiteration.JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { return &QueuedGangIterator{ schedulingContext: sctx, queuedJobsIterator: it, From e5e4260220dc3384f3e1bde122fff661a48f7088 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 20 Sep 2024 10:22:00 +0100 Subject: [PATCH 6/7] wip Signed-off-by: Chris Martin --- internal/scheduler/jobdb/jobdb.go | 26 ++++++++++++------- ...{job_context_repository.go => inmemory.go} | 0 2 files changed, 16 insertions(+), 10 deletions(-) rename internal/scheduler/jobiteration/{job_context_repository.go => inmemory.go} (100%) diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 1d014792035..7551292c9ba 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -2,6 +2,8 @@ package jobdb import ( "fmt" + "github.com/armadaproject/armada/internal/common/xiter" + "iter" "sync" "github.com/benbjohnson/immutable" @@ -20,10 +22,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) -type JobIterator interface { - Next() (*Job, bool) -} - var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{}) type JobDb struct { @@ -499,13 +497,21 @@ func (txn *Txn) HasQueuedJobs(queue string) bool { return queuedJobs.Len() > 0 } -// QueuedJobs returns true if the queue has any jobs in the running state or false otherwise -func (txn *Txn) QueuedJobs(queue string) JobIterator { +func (txn *Txn) GetJobsForQueue(queue string) iter.Seq[*Job] { jobQueue, ok := txn.jobsByQueue[queue] - if ok { - return jobQueue.Iterator() - } else { - return emptyList.Iterator() + + if !ok { + return xiter.Empty[*Job]() + } + + setIter := jobQueue.Iterator() + return func(yield func(*Job) bool) { + for !setIter.Done() { + val, _ := setIter.Next() + if !yield(val) { + return + } + } } } diff --git a/internal/scheduler/jobiteration/job_context_repository.go b/internal/scheduler/jobiteration/inmemory.go similarity index 100% rename from internal/scheduler/jobiteration/job_context_repository.go rename to internal/scheduler/jobiteration/inmemory.go From 8fbd001d617bef51661ff69fdc5c9e82ce6199cf Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 20 Sep 2024 14:32:57 +0100 Subject: [PATCH 7/7] wip Signed-off-by: Chris Martin --- internal/common/{xiter => iter}/xiter.go | 2 +- internal/scheduler/jobdb/jobdb.go | 8 +- internal/scheduler/jobdb/jobdb_test.go | 13 +- internal/scheduler/jobiteration/adapter.go | 4 +- internal/scheduler/jobiteration/inmemory.go | 4 +- internal/scheduler/jobiteration/interfaces.go | 7 +- internal/scheduler/jobiteration_test.go | 276 ------------------ .../scheduler/preempting_queue_scheduler.go | 4 +- .../preempting_queue_scheduler_test.go | 8 +- internal/scheduler/queue_scheduler_test.go | 11 +- internal/scheduler/simulator/simulator.go | 2 +- 11 files changed, 27 insertions(+), 312 deletions(-) rename internal/common/{xiter => iter}/xiter.go (97%) delete mode 100644 internal/scheduler/jobiteration_test.go diff --git a/internal/common/xiter/xiter.go b/internal/common/iter/xiter.go similarity index 97% rename from internal/common/xiter/xiter.go rename to internal/common/iter/xiter.go index 8659e225e9f..7528dadd9ef 100644 --- a/internal/common/xiter/xiter.go +++ b/internal/common/iter/xiter.go @@ -1,4 +1,4 @@ -package xiter +package iter import "iter" diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 7551292c9ba..20c17bef2ac 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -2,8 +2,6 @@ package jobdb import ( "fmt" - "github.com/armadaproject/armada/internal/common/xiter" - "iter" "sync" "github.com/benbjohnson/immutable" @@ -19,6 +17,7 @@ import ( "github.com/armadaproject/armada/internal/scheduler/adapters" "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" + "github.com/armadaproject/armada/internal/scheduler/iter" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -497,11 +496,12 @@ func (txn *Txn) HasQueuedJobs(queue string) bool { return queuedJobs.Len() > 0 } -func (txn *Txn) GetJobsForQueue(queue string) iter.Seq[*Job] { +func (txn *Txn) GetJobsForQueue(queue string) iter.Iterator[*Job] { + jobQueue, ok := txn.jobsByQueue[queue] if !ok { - return xiter.Empty[*Job]() + return iter.Empty[*Job]() } setIter := jobQueue.Iterator() diff --git a/internal/scheduler/jobdb/jobdb_test.go b/internal/scheduler/jobdb/jobdb_test.go index 7025691836d..803e73723d7 100644 --- a/internal/scheduler/jobdb/jobdb_test.go +++ b/internal/scheduler/jobdb/jobdb_test.go @@ -2,6 +2,7 @@ package jobdb import ( "math/rand" + "slices" "sort" "testing" @@ -9,7 +10,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -147,13 +147,8 @@ func TestJobDb_TestQueuedJobs(t *testing.T) { err := txn.Upsert(jobs) require.NoError(t, err) collect := func() []*Job { - retrieved := make([]*Job, 0) - iter := txn.QueuedJobs(jobs[0].Queue()) - for !iter.Done() { - j, _ := iter.Next() - retrieved = append(retrieved, j) - } - return retrieved + iter := txn.GetJobsForQueue(jobs[0].Queue()) + return slices.Collect(iter) } assert.Equal(t, jobs, collect()) @@ -183,7 +178,7 @@ func TestJobDb_TestQueuedJobs(t *testing.T) { // clear all jobs err = txn.BatchDelete([]string{updatedJob.id, job10.id, jobs[0].id, jobs[2].id, jobs[6].id, jobs[9].id}) require.NoError(t, err) - assert.Equal(t, []*Job{}, collect()) + assert.Equal(t, nil, collect()) } func TestJobDb_TestGetAll(t *testing.T) { diff --git a/internal/scheduler/jobiteration/adapter.go b/internal/scheduler/jobiteration/adapter.go index 50ffd198be7..0b3b0fae090 100644 --- a/internal/scheduler/jobiteration/adapter.go +++ b/internal/scheduler/jobiteration/adapter.go @@ -1,7 +1,7 @@ package jobiteration import ( - "github.com/armadaproject/armada/internal/common/xiter" + "github.com/armadaproject/armada/internal/common/iter" "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/jobdb" ) @@ -11,7 +11,7 @@ type JobContextRepositoryAdapter struct { } func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) JobContextIterator { - return xiter.Map(func(j *jobdb.Job) *context.JobSchedulingContext { + return iter.Map(func(j *jobdb.Job) *context.JobSchedulingContext { return context.JobSchedulingContextFromJob(j) }, j.JobRepository.GetJobsForQueue(queue)) } diff --git a/internal/scheduler/jobiteration/inmemory.go b/internal/scheduler/jobiteration/inmemory.go index 0c31a563a0e..805eebf1483 100644 --- a/internal/scheduler/jobiteration/inmemory.go +++ b/internal/scheduler/jobiteration/inmemory.go @@ -1,7 +1,7 @@ package jobiteration import ( - "github.com/armadaproject/armada/internal/common/xiter" + "github.com/armadaproject/armada/internal/common/iter" "github.com/armadaproject/armada/internal/scheduler/context" "slices" ) @@ -40,5 +40,5 @@ func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName strin if ok { return slices.Values(queue) } - return xiter.Empty[*context.JobSchedulingContext]() + return iter.Empty[*context.JobSchedulingContext]() } diff --git a/internal/scheduler/jobiteration/interfaces.go b/internal/scheduler/jobiteration/interfaces.go index 8d12b034df9..f473389f5d3 100644 --- a/internal/scheduler/jobiteration/interfaces.go +++ b/internal/scheduler/jobiteration/interfaces.go @@ -1,15 +1,14 @@ package jobiteration import ( - "iter" - "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/iter" "github.com/armadaproject/armada/internal/scheduler/jobdb" ) type ( - JobIterator = iter.Seq[*jobdb.Job] - JobContextIterator = iter.Seq[*context.JobSchedulingContext] + JobIterator = iter.Iterator[*jobdb.Job] + JobContextIterator = iter.Iterator[*context.JobSchedulingContext] ) // JobRepository is a source of jobs diff --git a/internal/scheduler/jobiteration_test.go b/internal/scheduler/jobiteration_test.go deleted file mode 100644 index c4f10a00bfa..00000000000 --- a/internal/scheduler/jobiteration_test.go +++ /dev/null @@ -1,276 +0,0 @@ -package scheduler - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/common/util" - schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/jobdb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" - "github.com/armadaproject/armada/internal/scheduler/testfixtures" -) - -func TestInMemoryJobRepository(t *testing.T) { - jobs := []*jobdb.Job{ - testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(3).WithPriority(1), - testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(1).WithPriority(1), - testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(2).WithPriority(1), - testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(0).WithPriority(3), - testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(0).WithPriority(0), - testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(0).WithPriority(2), - } - jctxs := make([]*schedulercontext.JobSchedulingContext, len(jobs)) - for i, job := range jobs { - jctxs[i] = &schedulercontext.JobSchedulingContext{Job: job, ResourceRequirements: job.EfficientResourceRequirements()} - } - repo := NewInMemoryJobRepository() - repo.EnqueueMany(jctxs) - expected := []*jobdb.Job{ - jobs[4], jobs[1], jobs[2], jobs[0], jobs[5], jobs[3], - } - actual := make([]*jobdb.Job, 0) - it := repo.GetJobIterator("A") - for { - jctx, err := it.Next() - require.NoError(t, err) - if jctx == nil { - break - } - actual = append(actual, jctx.Job) - } - assert.Equal(t, expected, actual) -} - -func TestMultiJobsIterator_TwoQueues(t *testing.T) { - repo := newMockJobRepository() - expected := make([]string, 0) - for _, req := range testfixtures.N1CpuPodReqs("A", 0, 5) { - job := jobFromPodSpec("A", req) - repo.Enqueue(job) - expected = append(expected, job.Id()) - } - for _, req := range testfixtures.N1CpuPodReqs("B", 0, 5) { - job := jobFromPodSpec("B", req) - repo.Enqueue(job) - expected = append(expected, job.Id()) - } - - ctx := armadacontext.Background() - its := make([]JobIterator, 3) - for i, queue := range []string{"A", "B", "C"} { - it := NewQueuedJobsIterator(ctx, queue, repo) - its[i] = it - } - it := NewMultiJobsIterator(its...) - - actual := make([]string, 0) - for { - jctx, err := it.Next() - require.NoError(t, err) - if jctx == nil { - break - } - actual = append(actual, jctx.Job.Id()) - } - assert.Equal(t, expected, actual) - v, err := it.Next() - require.NoError(t, err) - require.Nil(t, v) -} - -func TestQueuedJobsIterator_OneQueue(t *testing.T) { - repo := newMockJobRepository() - expected := make([]string, 0) - for _, req := range testfixtures.N1CpuPodReqs("A", 0, 10) { - job := jobFromPodSpec("A", req) - repo.Enqueue(job) - expected = append(expected, job.Id()) - } - ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo) - actual := make([]string, 0) - for { - jctx, err := it.Next() - require.NoError(t, err) - if jctx == nil { - break - } - actual = append(actual, jctx.Job.Id()) - } - assert.Equal(t, expected, actual) -} - -func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) { - repo := newMockJobRepository() - expected := make([]string, 0) - for _, req := range testfixtures.N1CpuPodReqs("A", 0, 17) { - job := jobFromPodSpec("A", req) - repo.Enqueue(job) - expected = append(expected, job.Id()) - } - ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo) - actual := make([]string, 0) - for { - jctx, err := it.Next() - require.NoError(t, err) - if jctx == nil { - break - } - actual = append(actual, jctx.Job.Id()) - } - assert.Equal(t, expected, actual) -} - -func TestQueuedJobsIterator_ManyJobs(t *testing.T) { - repo := newMockJobRepository() - expected := make([]string, 0) - for _, req := range testfixtures.N1CpuPodReqs("A", 0, 113) { - job := jobFromPodSpec("A", req) - repo.Enqueue(job) - expected = append(expected, job.Id()) - } - ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo) - actual := make([]string, 0) - for { - jctx, err := it.Next() - require.NoError(t, err) - if jctx == nil { - break - } - actual = append(actual, jctx.Job.Id()) - } - assert.Equal(t, expected, actual) -} - -func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { - repo := newMockJobRepository() - expected := make([]string, 0) - for _, req := range testfixtures.N1CpuPodReqs("A", 0, 10) { - job := jobFromPodSpec("A", req) - repo.Enqueue(job) - expected = append(expected, job.Id()) - } - - for _, req := range testfixtures.N1CpuPodReqs("B", 0, 10) { - job := jobFromPodSpec("B", req) - repo.Enqueue(job) - } - ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo) - actual := make([]string, 0) - for { - jctx, err := it.Next() - require.NoError(t, err) - if jctx == nil { - break - } - actual = append(actual, jctx.Job.Id()) - } - assert.Equal(t, expected, actual) -} - -func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) { - repo := newMockJobRepository() - for _, req := range testfixtures.N1CpuPodReqs("A", 0, 10) { - job := jobFromPodSpec("A", req) - repo.Enqueue(job) - } - - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond) - time.Sleep(20 * time.Millisecond) - defer cancel() - it := NewQueuedJobsIterator(ctx, "A", repo) - job, err := it.Next() - assert.Nil(t, job) - assert.ErrorIs(t, err, context.DeadlineExceeded) - - // Calling again should produce the same error. - job, err = it.Next() - assert.Nil(t, job) - assert.ErrorIs(t, err, context.DeadlineExceeded) -} - -func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { - repo := newMockJobRepository() - for _, req := range testfixtures.N1CpuPodReqs("A", 0, 10) { - job := jobFromPodSpec("A", req) - repo.Enqueue(job) - } - ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo) - for job, err := it.Next(); job != nil; job, err = it.Next() { - require.NoError(t, err) - } - job, err := it.Next() - assert.Nil(t, job) - assert.NoError(t, err) -} - -// TODO: Deprecate in favour of InMemoryRepo. -type mockJobRepository struct { - jobsByQueue map[string][]*jobdb.Job - jobsById map[string]*jobdb.Job - // Ids of all jobs hat were leased to an executor. - leasedJobs map[string]bool - getQueueJobIdsDelay time.Duration -} - -func newMockJobRepository() *mockJobRepository { - return &mockJobRepository{ - jobsByQueue: make(map[string][]*jobdb.Job), - jobsById: make(map[string]*jobdb.Job), - leasedJobs: make(map[string]bool), - } -} - -func (repo *mockJobRepository) EnqueueMany(jobs []*jobdb.Job) { - for _, job := range jobs { - repo.Enqueue(job) - } -} - -func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { - repo.jobsByQueue[job.Queue()] = append(repo.jobsByQueue[job.Queue()], job) - repo.jobsById[job.Id()] = job -} - -func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator { - return NewQueuedJobsIterator(ctx, queue, repo) -} - -func (repo *mockJobRepository) GetQueueJobIds(queue string) []string { - time.Sleep(repo.getQueueJobIdsDelay) - if jobs, ok := repo.jobsByQueue[queue]; ok { - rv := make([]string, 0, len(jobs)) - for _, job := range jobs { - if !repo.leasedJobs[job.Id()] { - rv = append(rv, job.Id()) - } - } - return rv - } else { - return make([]string, 0) - } -} - -func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job { - rv := make([]*jobdb.Job, len(jobIds)) - for i, jobId := range jobIds { - if job, ok := repo.jobsById[jobId]; ok { - rv[i] = job - } - } - return rv -} - -func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job { - return testfixtures.TestJob(queue, util.ULID(), "armada-default", req) -} diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 5ca0d370324..6ef612875e8 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -2,7 +2,7 @@ package scheduler import ( "fmt" - "github.com/armadaproject/armada/internal/common/xiter" + "github.com/armadaproject/armada/internal/common/iter" "github.com/armadaproject/armada/internal/scheduler/jobiteration" "math" "time" @@ -531,7 +531,7 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, jobRep for i, repo := range jobRepos { iters[i] = repo.GetJobContextsForQueue(qctx.Queue) } - jobIteratorByQueue[qctx.Queue] = xiter.Concat(iters...) + jobIteratorByQueue[qctx.Queue] = iter.Concat(iters...) } // Reset the scheduling keys cache after evicting jobs. diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 849b3262440..c738b0c1512 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -57,7 +57,7 @@ func TestEvictOversubscribed(t *testing.T) { require.NoError(t, err) evictor := NewOversubscribedEvictor( - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb) result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn) require.NoError(t, err) @@ -1862,7 +1862,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nodeIdByJobId, jobIdsByGangId, @@ -2209,7 +2209,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nil, nil, @@ -2268,7 +2268,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nil, nil, diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 050fd2a97ee..94e643f7b85 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -2,6 +2,7 @@ package scheduler import ( "fmt" + "github.com/armadaproject/armada/internal/scheduler/jobiteration" "testing" "github.com/stretchr/testify/assert" @@ -501,11 +502,7 @@ func TestQueueScheduler(t *testing.T) { for i, job := range tc.Jobs { legacySchedulerJobs[i] = job } - jobRepo := NewInMemoryJobRepository() - jobRepo.EnqueueMany( - schedulercontext.JobSchedulingContextsFromJobs(legacySchedulerJobs), - ) - + jobRepo := jobiteration.NewInMemoryJobContextRepository(schedulercontext.JobSchedulingContextsFromJobs(legacySchedulerJobs)) fairnessCostProvider, err := fairness.NewDominantResourceFairness( tc.TotalResources, tc.SchedulingConfig, @@ -535,9 +532,9 @@ func TestQueueScheduler(t *testing.T) { require.NoError(t, err) } constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues, map[string]bool{}) - jobIteratorByQueue := make(map[string]JobIterator) + jobIteratorByQueue := make(map[string]jobiteration.JobContextIterator) for _, q := range tc.Queues { - it := jobRepo.GetJobIterator(q.Name) + it := jobRepo.GetJobContextsForQueue(q.Name) jobIteratorByQueue[q.Name] = it } sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue) diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 9f8c2e52584..93669e45186 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -513,7 +513,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { constraints, nloatingResourceTypes, s.schedulingConfig.ProtectedFractionOfFairShare, - scheduler.NewSchedulerJobRepositoryAdapter(txn), + txn, nodeDb, // TODO: Necessary to support partial eviction. nil,