diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 8a65691390a..6b26902c64d 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -20,6 +20,11 @@ import ( "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) +type JobIterator interface { + Next() (*Job, bool) + Done() bool +} + var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{}) type JobDb struct { @@ -496,7 +501,7 @@ func (txn *Txn) HasQueuedJobs(queue string) bool { } // QueuedJobs returns true if the queue has any jobs in the running state or false otherwise -func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] { +func (txn *Txn) QueuedJobs(queue string) JobIterator { jobQueue, ok := txn.jobsByQueue[queue] if ok { return jobQueue.Iterator() diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index b814e9144a9..ac21d2b6496 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -11,13 +11,13 @@ import ( "github.com/armadaproject/armada/internal/scheduler/jobdb" ) -type JobIterator interface { +type JobContextIterator interface { Next() (*schedulercontext.JobSchedulingContext, error) } type JobRepository interface { - GetQueueJobIds(queueName string) []string - GetExistingJobsByIds(ids []string) []*jobdb.Job + QueuedJobs(queueName string) jobdb.JobIterator + GetById(id string) *jobdb.Job } type InMemoryJobIterator struct { @@ -97,7 +97,7 @@ func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []*jobd return rv } -func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator { +func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterator { repo.mu.Lock() defer repo.mu.Unlock() return NewInMemoryJobIterator(slices.Clone(repo.jctxsByQueue[queue])) @@ -105,17 +105,14 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator { // QueuedJobsIterator is an iterator over all jobs in a queue. type QueuedJobsIterator struct { - repo JobRepository - jobIds []string - idx int - ctx *armadacontext.Context + jobIter jobdb.JobIterator + ctx *armadacontext.Context } func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator { return &QueuedJobsIterator{ - jobIds: repo.GetQueueJobIds(queue), - repo: repo, - ctx: ctx, + jobIter: repo.QueuedJobs(queue), + ctx: ctx, } } @@ -124,22 +121,21 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er case <-it.ctx.Done(): return nil, it.ctx.Err() default: - if it.idx >= len(it.jobIds) { + job, _ := it.jobIter.Next() + if job == nil { return nil, nil } - job := it.repo.GetExistingJobsByIds([]string{it.jobIds[it.idx]}) - it.idx++ - return schedulercontext.JobSchedulingContextFromJob(job[0]), nil + return schedulercontext.JobSchedulingContextFromJob(job), nil } } // MultiJobsIterator chains several JobIterators together in the order provided. type MultiJobsIterator struct { i int - its []JobIterator + its []JobContextIterator } -func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator { +func NewMultiJobsIterator(its ...JobContextIterator) *MultiJobsIterator { return &MultiJobsIterator{ its: its, } diff --git a/internal/scheduler/jobiteration_test.go b/internal/scheduler/jobiteration_test.go index c4f10a00bfa..a10866535b1 100644 --- a/internal/scheduler/jobiteration_test.go +++ b/internal/scheduler/jobiteration_test.go @@ -62,7 +62,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { } ctx := armadacontext.Background() - its := make([]JobIterator, 3) + its := make([]JobContextIterator, 3) for i, queue := range []string{"A", "B", "C"} { it := NewQueuedJobsIterator(ctx, queue, repo) its[i] = it @@ -214,20 +214,43 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { assert.NoError(t, err) } -// TODO: Deprecate in favour of InMemoryRepo. +type mockJobIterator struct { + jobs []*jobdb.Job + i int +} + +func (iter *mockJobIterator) Done() bool { + return iter.i >= len(iter.jobs) +} + +func (iter *mockJobIterator) Next() (*jobdb.Job, bool) { + if iter.Done() { + return nil, false + } + job := iter.jobs[iter.i] + iter.i++ + return job, true +} + type mockJobRepository struct { jobsByQueue map[string][]*jobdb.Job jobsById map[string]*jobdb.Job - // Ids of all jobs hat were leased to an executor. - leasedJobs map[string]bool - getQueueJobIdsDelay time.Duration +} + +func (repo *mockJobRepository) QueuedJobs(queueName string) jobdb.JobIterator { + q := repo.jobsByQueue[queueName] + return &mockJobIterator{jobs: q} +} + +func (repo *mockJobRepository) GetById(id string) *jobdb.Job { + j, _ := repo.jobsById[id] + return j } func newMockJobRepository() *mockJobRepository { return &mockJobRepository{ jobsByQueue: make(map[string][]*jobdb.Job), jobsById: make(map[string]*jobdb.Job), - leasedJobs: make(map[string]bool), } } @@ -242,35 +265,10 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { repo.jobsById[job.Id()] = job } -func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator { +func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator { return NewQueuedJobsIterator(ctx, queue, repo) } -func (repo *mockJobRepository) GetQueueJobIds(queue string) []string { - time.Sleep(repo.getQueueJobIdsDelay) - if jobs, ok := repo.jobsByQueue[queue]; ok { - rv := make([]string, 0, len(jobs)) - for _, job := range jobs { - if !repo.leasedJobs[job.Id()] { - rv = append(rv, job.Id()) - } - } - return rv - } else { - return make([]string, 0) - } -} - -func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job { - rv := make([]*jobdb.Job, len(jobIds)) - for i, jobId := range jobIds { - if job, ok := repo.jobsById[jobId]; ok { - rv[i] = job - } - } - return rv -} - func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job { return testfixtures.TestJob(queue, util.ULID(), "armada-default", req) } diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 8b4fed9225e..695e728957b 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -525,7 +525,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch } func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) { - jobIteratorByQueue := make(map[string]JobIterator) + jobIteratorByQueue := make(map[string]JobContextIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue) if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { @@ -821,13 +821,16 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) { continue } - jobIds := make([]string, 0, len(node.AllocatedByJobId)) + jobs := make([]*jobdb.Job, 0, len(node.AllocatedByJobId)) for jobId := range node.AllocatedByJobId { if _, ok := node.EvictedJobRunIds[jobId]; !ok { - jobIds = append(jobIds, jobId) + job := evi.jobRepo.GetById(jobId) + if job != nil { + jobs = append(jobs, job) + } + } } - jobs := evi.jobRepo.GetExistingJobsByIds(jobIds) evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(jobFilter, jobs, node) if err != nil { return nil, err diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 849b3262440..c738b0c1512 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -57,7 +57,7 @@ func TestEvictOversubscribed(t *testing.T) { require.NoError(t, err) evictor := NewOversubscribedEvictor( - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb) result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn) require.NoError(t, err) @@ -1862,7 +1862,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nodeIdByJobId, jobIdsByGangId, @@ -2209,7 +2209,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nil, nil, @@ -2268,7 +2268,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(jobDbTxn), + jobDbTxn, nodeDb, nil, nil, diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index b2e9d2f8916..9cac41665bf 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -32,7 +32,7 @@ func NewQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, - jobIteratorByQueue map[string]JobIterator, + jobIteratorByQueue map[string]JobContextIterator, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { if _, ok := sctx.QueueSchedulingContexts[queue]; !ok { @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { schedulingContext *schedulercontext.SchedulingContext - queuedJobsIterator JobIterator + queuedJobsIterator JobContextIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext // Maximum number of jobs to look at before giving up. @@ -231,7 +231,7 @@ type QueuedGangIterator struct { next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { return &QueuedGangIterator{ schedulingContext: sctx, queuedJobsIterator: it, diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 050fd2a97ee..17189fd0900 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -535,7 +535,7 @@ func TestQueueScheduler(t *testing.T) { require.NoError(t, err) } constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues, map[string]bool{}) - jobIteratorByQueue := make(map[string]JobIterator) + jobIteratorByQueue := make(map[string]JobContextIterator) for _, q := range tc.Queues { it := jobRepo.GetJobIterator(q.Name) jobIteratorByQueue[q.Name] = it diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index e3b2f9b11af..b49810d952f 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -478,7 +478,7 @@ func (l *FairSchedulingAlgo) schedulePool( constraints, l.floatingResourceTypes, l.schedulingConfig.ProtectedFractionOfFairShare, - NewSchedulerJobRepositoryAdapter(fsctx.txn), + fsctx.txn, nodeDb, fsctx.nodeIdByJobId, fsctx.jobIdsByGangId, @@ -527,41 +527,6 @@ func (l *FairSchedulingAlgo) schedulePool( return result, sctx, nil } -// SchedulerJobRepositoryAdapter allows jobDb implement the JobRepository interface. -// TODO: Pass JobDb into the scheduler instead of using this shim to convert to a JobRepo. -type SchedulerJobRepositoryAdapter struct { - txn *jobdb.Txn -} - -func NewSchedulerJobRepositoryAdapter(txn *jobdb.Txn) *SchedulerJobRepositoryAdapter { - return &SchedulerJobRepositoryAdapter{ - txn: txn, - } -} - -// GetQueueJobIds is necessary to implement the JobRepository interface, which we need while transitioning from the old -// to new scheduler. -func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) []string { - rv := make([]string, 0) - it := repo.txn.QueuedJobs(queue) - for v, _ := it.Next(); v != nil; v, _ = it.Next() { - rv = append(rv, v.Id()) - } - return rv -} - -// GetExistingJobsByIds is necessary to implement the JobRepository interface which we need while transitioning from the -// old to new scheduler. -func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) []*jobdb.Job { - rv := make([]*jobdb.Job, 0, len(ids)) - for _, id := range ids { - if job := repo.txn.GetById(id); job != nil { - rv = append(rv, job) - } - } - return rv -} - // populateNodeDb adds all the nodes and jobs associated with a particular pool to the nodeDb. func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, jobs []*jobdb.Job, nodes []*schedulerobjects.Node) error { txn := nodeDb.Txn(true)