From c1e17c9e52896f13634f7da1ff2c47f28d8e7010 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 20 Sep 2024 09:47:24 +0100 Subject: [PATCH] wip Signed-off-by: Chris Martin --- internal/scheduler/jobiteration/adapter.go | 4 +--- internal/scheduler/jobiteration/interfaces.go | 9 +++++++-- .../scheduler/jobiteration/job_context_repository.go | 3 +-- internal/scheduler/preempting_queue_scheduler.go | 5 ++--- internal/scheduler/queue_scheduler.go | 8 ++++---- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/internal/scheduler/jobiteration/adapter.go b/internal/scheduler/jobiteration/adapter.go index f6bb4555bb8..50ffd198be7 100644 --- a/internal/scheduler/jobiteration/adapter.go +++ b/internal/scheduler/jobiteration/adapter.go @@ -1,8 +1,6 @@ package jobiteration import ( - "iter" - "github.com/armadaproject/armada/internal/common/xiter" "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -12,7 +10,7 @@ type JobContextRepositoryAdapter struct { JobRepository } -func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) iter.Seq[*context.JobSchedulingContext] { +func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) JobContextIterator { return xiter.Map(func(j *jobdb.Job) *context.JobSchedulingContext { return context.JobSchedulingContextFromJob(j) }, j.JobRepository.GetJobsForQueue(queue)) diff --git a/internal/scheduler/jobiteration/interfaces.go b/internal/scheduler/jobiteration/interfaces.go index e6d562e6765..8d12b034df9 100644 --- a/internal/scheduler/jobiteration/interfaces.go +++ b/internal/scheduler/jobiteration/interfaces.go @@ -7,13 +7,18 @@ import ( "github.com/armadaproject/armada/internal/scheduler/jobdb" ) +type ( + JobIterator = iter.Seq[*jobdb.Job] + JobContextIterator = iter.Seq[*context.JobSchedulingContext] +) + // JobRepository is a source of jobs type JobRepository interface { GetById(id string) *jobdb.Job - GetJobsForQueue(queue string) iter.Seq[*jobdb.Job] + GetJobsForQueue(queue string) JobIterator } // JobContextRepository is a source of job contexts type JobContextRepository interface { - GetJobContextsForQueue(queue string) iter.Seq[*context.JobSchedulingContext] + GetJobContextsForQueue(queue string) JobContextIterator } diff --git a/internal/scheduler/jobiteration/job_context_repository.go b/internal/scheduler/jobiteration/job_context_repository.go index 9fb192924a3..0c31a563a0e 100644 --- a/internal/scheduler/jobiteration/job_context_repository.go +++ b/internal/scheduler/jobiteration/job_context_repository.go @@ -3,7 +3,6 @@ package jobiteration import ( "github.com/armadaproject/armada/internal/common/xiter" "github.com/armadaproject/armada/internal/scheduler/context" - "iter" "slices" ) @@ -36,7 +35,7 @@ func NewInMemoryJobContextRepository(jobContexts []*context.JobSchedulingContext } } -func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName string) iter.Seq[*context.JobSchedulingContext] { +func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName string) JobContextIterator { queue, ok := repo.jobContextsByQueue[queueName] if ok { return slices.Values(queue) diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 0d6a1d2bf46..5ca0d370324 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/armadaproject/armada/internal/common/xiter" "github.com/armadaproject/armada/internal/scheduler/jobiteration" - "iter" "math" "time" @@ -526,9 +525,9 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch } func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, jobRepos ...jobiteration.JobContextRepository) (*schedulerresult.SchedulerResult, error) { - jobIteratorByQueue := make(map[string]iter.Seq[*schedulercontext.JobSchedulingContext]) + jobIteratorByQueue := make(map[string]jobiteration.JobContextIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { - iters := make([]iter.Seq[*schedulercontext.JobSchedulingContext], len(jobRepos)) + iters := make([]jobiteration.JobContextIterator, len(jobRepos)) for i, repo := range jobRepos { iters[i] = repo.GetJobContextsForQueue(qctx.Queue) } diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index a545dd1fcfa..b308985bc44 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -3,7 +3,7 @@ package scheduler import ( "container/heap" "fmt" - "iter" + "github.com/armadaproject/armada/internal/scheduler/jobiteration" "time" "github.com/pkg/errors" @@ -32,7 +32,7 @@ func NewQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, - jobIteratorByQueue map[string]iter.Seq[*schedulercontext.JobSchedulingContext], + jobIteratorByQueue map[string]jobiteration.JobContextIterator, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { if _, ok := sctx.QueueSchedulingContexts[queue]; !ok { @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul // Jobs without gangIdAnnotation are considered gangs of cardinality 1. type QueuedGangIterator struct { schedulingContext *schedulercontext.SchedulingContext - queuedJobsIterator iter.Seq[*schedulercontext.JobSchedulingContext] + queuedJobsIterator jobiteration.JobContextIterator // Groups jctxs by the gang they belong to. jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext // Maximum number of jobs to look at before giving up. @@ -231,7 +231,7 @@ type QueuedGangIterator struct { next *schedulercontext.GangSchedulingContext } -func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it iter.Seq[*schedulercontext.JobSchedulingContext], maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { +func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it jobiteration.JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator { return &QueuedGangIterator{ schedulingContext: sctx, queuedJobsIterator: it,