Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Interface To JobRepository #3958

Merged
merged 3 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type JobIterator interface {
Next() (*Job, bool)
Done() bool
}

var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{})

type JobDb struct {
Expand Down Expand Up @@ -496,7 +501,7 @@ func (txn *Txn) HasQueuedJobs(queue string) bool {
}

// QueuedJobs returns true if the queue has any jobs in the running state or false otherwise
func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] {
func (txn *Txn) QueuedJobs(queue string) JobIterator {
jobQueue, ok := txn.jobsByQueue[queue]
if ok {
return jobQueue.Iterator()
Expand Down
30 changes: 13 additions & 17 deletions internal/scheduler/jobiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"github.com/armadaproject/armada/internal/scheduler/jobdb"
)

type JobIterator interface {
type JobContextIterator interface {
Next() (*schedulercontext.JobSchedulingContext, error)
}

type JobRepository interface {
GetQueueJobIds(queueName string) []string
GetExistingJobsByIds(ids []string) []*jobdb.Job
QueuedJobs(queueName string) jobdb.JobIterator
GetById(id string) *jobdb.Job
}

type InMemoryJobIterator struct {
Expand Down Expand Up @@ -97,25 +97,22 @@ func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []*jobd
return rv
}

func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator {
func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterator {
repo.mu.Lock()
defer repo.mu.Unlock()
return NewInMemoryJobIterator(slices.Clone(repo.jctxsByQueue[queue]))
}

// QueuedJobsIterator is an iterator over all jobs in a queue.
type QueuedJobsIterator struct {
repo JobRepository
jobIds []string
idx int
ctx *armadacontext.Context
jobIter jobdb.JobIterator
ctx *armadacontext.Context
}

func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator {
return &QueuedJobsIterator{
jobIds: repo.GetQueueJobIds(queue),
repo: repo,
ctx: ctx,
jobIter: repo.QueuedJobs(queue),
ctx: ctx,
}
}

Expand All @@ -124,22 +121,21 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er
case <-it.ctx.Done():
return nil, it.ctx.Err()
default:
if it.idx >= len(it.jobIds) {
job, _ := it.jobIter.Next()
if job == nil {
return nil, nil
}
job := it.repo.GetExistingJobsByIds([]string{it.jobIds[it.idx]})
it.idx++
return schedulercontext.JobSchedulingContextFromJob(job[0]), nil
return schedulercontext.JobSchedulingContextFromJob(job), nil
}
}

// MultiJobsIterator chains several JobIterators together in the order provided.
type MultiJobsIterator struct {
i int
its []JobIterator
its []JobContextIterator
}

func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator {
func NewMultiJobsIterator(its ...JobContextIterator) *MultiJobsIterator {
return &MultiJobsIterator{
its: its,
}
Expand Down
62 changes: 30 additions & 32 deletions internal/scheduler/jobiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) {
}

ctx := armadacontext.Background()
its := make([]JobIterator, 3)
its := make([]JobContextIterator, 3)
for i, queue := range []string{"A", "B", "C"} {
it := NewQueuedJobsIterator(ctx, queue, repo)
its[i] = it
Expand Down Expand Up @@ -214,20 +214,43 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) {
assert.NoError(t, err)
}

// TODO: Deprecate in favour of InMemoryRepo.
type mockJobIterator struct {
jobs []*jobdb.Job
i int
}

func (iter *mockJobIterator) Done() bool {
return iter.i >= len(iter.jobs)
}

func (iter *mockJobIterator) Next() (*jobdb.Job, bool) {
if iter.Done() {
return nil, false
}
job := iter.jobs[iter.i]
iter.i++
return job, true
}

type mockJobRepository struct {
jobsByQueue map[string][]*jobdb.Job
jobsById map[string]*jobdb.Job
// Ids of all jobs hat were leased to an executor.
leasedJobs map[string]bool
getQueueJobIdsDelay time.Duration
}

func (repo *mockJobRepository) QueuedJobs(queueName string) jobdb.JobIterator {
q := repo.jobsByQueue[queueName]
return &mockJobIterator{jobs: q}
}

func (repo *mockJobRepository) GetById(id string) *jobdb.Job {
j, _ := repo.jobsById[id]
return j
}

func newMockJobRepository() *mockJobRepository {
return &mockJobRepository{
jobsByQueue: make(map[string][]*jobdb.Job),
jobsById: make(map[string]*jobdb.Job),
leasedJobs: make(map[string]bool),
}
}

Expand All @@ -242,35 +265,10 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) {
repo.jobsById[job.Id()] = job
}

func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator {
func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator {
return NewQueuedJobsIterator(ctx, queue, repo)
}

func (repo *mockJobRepository) GetQueueJobIds(queue string) []string {
time.Sleep(repo.getQueueJobIdsDelay)
if jobs, ok := repo.jobsByQueue[queue]; ok {
rv := make([]string, 0, len(jobs))
for _, job := range jobs {
if !repo.leasedJobs[job.Id()] {
rv = append(rv, job.Id())
}
}
return rv
} else {
return make([]string, 0)
}
}

func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job {
rv := make([]*jobdb.Job, len(jobIds))
for i, jobId := range jobIds {
if job, ok := repo.jobsById[jobId]; ok {
rv[i] = job
}
}
return rv
}

func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job {
return testfixtures.TestJob(queue, util.ULID(), "armada-default", req)
}
11 changes: 7 additions & 4 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
}

func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) {
jobIteratorByQueue := make(map[string]JobIterator)
jobIteratorByQueue := make(map[string]JobContextIterator)
for _, qctx := range sch.schedulingContext.QueueSchedulingContexts {
evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue)
if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() {
Expand Down Expand Up @@ -821,13 +821,16 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev
if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) {
continue
}
jobIds := make([]string, 0, len(node.AllocatedByJobId))
jobs := make([]*jobdb.Job, 0, len(node.AllocatedByJobId))
for jobId := range node.AllocatedByJobId {
if _, ok := node.EvictedJobRunIds[jobId]; !ok {
jobIds = append(jobIds, jobId)
job := evi.jobRepo.GetById(jobId)
if job != nil {
jobs = append(jobs, job)
}

}
}
jobs := evi.jobRepo.GetExistingJobsByIds(jobIds)
evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(jobFilter, jobs, node)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestEvictOversubscribed(t *testing.T) {
require.NoError(t, err)

evictor := NewOversubscribedEvictor(
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb)
result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn)
require.NoError(t, err)
Expand Down Expand Up @@ -1862,7 +1862,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nodeIdByJobId,
jobIdsByGangId,
Expand Down Expand Up @@ -2209,7 +2209,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nil,
nil,
Expand Down Expand Up @@ -2268,7 +2268,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nil,
nil,
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewQueueScheduler(
constraints schedulerconstraints.SchedulingConstraints,
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
jobIteratorByQueue map[string]JobIterator,
jobIteratorByQueue map[string]JobContextIterator,
) (*QueueScheduler, error) {
for queue := range jobIteratorByQueue {
if _, ok := sctx.QueueSchedulingContexts[queue]; !ok {
Expand Down Expand Up @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul
// Jobs without gangIdAnnotation are considered gangs of cardinality 1.
type QueuedGangIterator struct {
schedulingContext *schedulercontext.SchedulingContext
queuedJobsIterator JobIterator
queuedJobsIterator JobContextIterator
// Groups jctxs by the gang they belong to.
jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext
// Maximum number of jobs to look at before giving up.
Expand All @@ -231,7 +231,7 @@ type QueuedGangIterator struct {
next *schedulercontext.GangSchedulingContext
}

func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
return &QueuedGangIterator{
schedulingContext: sctx,
queuedJobsIterator: it,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestQueueScheduler(t *testing.T) {
require.NoError(t, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues, map[string]bool{})
jobIteratorByQueue := make(map[string]JobIterator)
jobIteratorByQueue := make(map[string]JobContextIterator)
for _, q := range tc.Queues {
it := jobRepo.GetJobIterator(q.Name)
jobIteratorByQueue[q.Name] = it
Expand Down
37 changes: 1 addition & 36 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (l *FairSchedulingAlgo) schedulePool(
constraints,
l.floatingResourceTypes,
l.schedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(fsctx.txn),
fsctx.txn,
nodeDb,
fsctx.nodeIdByJobId,
fsctx.jobIdsByGangId,
Expand Down Expand Up @@ -528,41 +528,6 @@ func (l *FairSchedulingAlgo) schedulePool(
return result, sctx, nil
}

// SchedulerJobRepositoryAdapter allows jobDb implement the JobRepository interface.
// TODO: Pass JobDb into the scheduler instead of using this shim to convert to a JobRepo.
type SchedulerJobRepositoryAdapter struct {
txn *jobdb.Txn
}

func NewSchedulerJobRepositoryAdapter(txn *jobdb.Txn) *SchedulerJobRepositoryAdapter {
return &SchedulerJobRepositoryAdapter{
txn: txn,
}
}

// GetQueueJobIds is necessary to implement the JobRepository interface, which we need while transitioning from the old
// to new scheduler.
func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) []string {
rv := make([]string, 0)
it := repo.txn.QueuedJobs(queue)
for v, _ := it.Next(); v != nil; v, _ = it.Next() {
rv = append(rv, v.Id())
}
return rv
}

// GetExistingJobsByIds is necessary to implement the JobRepository interface which we need while transitioning from the
// old to new scheduler.
func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) []*jobdb.Job {
rv := make([]*jobdb.Job, 0, len(ids))
for _, id := range ids {
if job := repo.txn.GetById(id); job != nil {
rv = append(rv, job)
}
}
return rv
}

// populateNodeDb adds all the nodes and jobs associated with a particular pool to the nodeDb.
func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, jobs []*jobdb.Job, nodes []*schedulerobjects.Node) error {
txn := nodeDb.Txn(true)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
constraints,
nloatingResourceTypes,
s.schedulingConfig.ProtectedFractionOfFairShare,
scheduler.NewSchedulerJobRepositoryAdapter(txn),
txn,
nodeDb,
// TODO: Necessary to support partial eviction.
nil,
Expand Down