Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 committed Sep 20, 2024
1 parent acbdfe5 commit c1e17c9
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 14 deletions.
4 changes: 1 addition & 3 deletions internal/scheduler/jobiteration/adapter.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package jobiteration

import (
"iter"

"github.com/armadaproject/armada/internal/common/xiter"
"github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
Expand All @@ -12,7 +10,7 @@ type JobContextRepositoryAdapter struct {
JobRepository
}

func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) iter.Seq[*context.JobSchedulingContext] {
func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) JobContextIterator {
return xiter.Map(func(j *jobdb.Job) *context.JobSchedulingContext {
return context.JobSchedulingContextFromJob(j)
}, j.JobRepository.GetJobsForQueue(queue))
Expand Down
9 changes: 7 additions & 2 deletions internal/scheduler/jobiteration/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ import (
"github.com/armadaproject/armada/internal/scheduler/jobdb"
)

type (
JobIterator = iter.Seq[*jobdb.Job]
JobContextIterator = iter.Seq[*context.JobSchedulingContext]
)

// JobRepository is a source of jobs
type JobRepository interface {
GetById(id string) *jobdb.Job
GetJobsForQueue(queue string) iter.Seq[*jobdb.Job]
GetJobsForQueue(queue string) JobIterator
}

// JobContextRepository is a source of job contexts
type JobContextRepository interface {
GetJobContextsForQueue(queue string) iter.Seq[*context.JobSchedulingContext]
GetJobContextsForQueue(queue string) JobContextIterator
}
3 changes: 1 addition & 2 deletions internal/scheduler/jobiteration/job_context_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package jobiteration
import (
"github.com/armadaproject/armada/internal/common/xiter"
"github.com/armadaproject/armada/internal/scheduler/context"
"iter"
"slices"
)

Expand Down Expand Up @@ -36,7 +35,7 @@ func NewInMemoryJobContextRepository(jobContexts []*context.JobSchedulingContext
}
}

func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName string) iter.Seq[*context.JobSchedulingContext] {
func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName string) JobContextIterator {
queue, ok := repo.jobContextsByQueue[queueName]
if ok {
return slices.Values(queue)
Expand Down
5 changes: 2 additions & 3 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"github.com/armadaproject/armada/internal/common/xiter"
"github.com/armadaproject/armada/internal/scheduler/jobiteration"
"iter"
"math"
"time"

Expand Down Expand Up @@ -526,9 +525,9 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
}

func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, jobRepos ...jobiteration.JobContextRepository) (*schedulerresult.SchedulerResult, error) {
jobIteratorByQueue := make(map[string]iter.Seq[*schedulercontext.JobSchedulingContext])
jobIteratorByQueue := make(map[string]jobiteration.JobContextIterator)
for _, qctx := range sch.schedulingContext.QueueSchedulingContexts {
iters := make([]iter.Seq[*schedulercontext.JobSchedulingContext], len(jobRepos))
iters := make([]jobiteration.JobContextIterator, len(jobRepos))
for i, repo := range jobRepos {
iters[i] = repo.GetJobContextsForQueue(qctx.Queue)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package scheduler
import (
"container/heap"
"fmt"
"iter"
"github.com/armadaproject/armada/internal/scheduler/jobiteration"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -32,7 +32,7 @@ func NewQueueScheduler(
constraints schedulerconstraints.SchedulingConstraints,
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
jobIteratorByQueue map[string]iter.Seq[*schedulercontext.JobSchedulingContext],
jobIteratorByQueue map[string]jobiteration.JobContextIterator,
) (*QueueScheduler, error) {
for queue := range jobIteratorByQueue {
if _, ok := sctx.QueueSchedulingContexts[queue]; !ok {
Expand Down Expand Up @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul
// Jobs without gangIdAnnotation are considered gangs of cardinality 1.
type QueuedGangIterator struct {
schedulingContext *schedulercontext.SchedulingContext
queuedJobsIterator iter.Seq[*schedulercontext.JobSchedulingContext]
queuedJobsIterator jobiteration.JobContextIterator
// Groups jctxs by the gang they belong to.
jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext
// Maximum number of jobs to look at before giving up.
Expand All @@ -231,7 +231,7 @@ type QueuedGangIterator struct {
next *schedulercontext.GangSchedulingContext
}

func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it iter.Seq[*schedulercontext.JobSchedulingContext], maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it jobiteration.JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
return &QueuedGangIterator{
schedulingContext: sctx,
queuedJobsIterator: it,
Expand Down

0 comments on commit c1e17c9

Please sign in to comment.