Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to Job Iteration #3957

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions internal/common/iter/xiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package iter

import "iter"

// Map returns an iterator over f applied to seq.
func Map[In, Out any](f func(In) Out, seq iter.Seq[In]) iter.Seq[Out] {
return func(yield func(Out) bool) {
for in := range seq {

Check failure on line 8 in internal/common/iter/xiter.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

cannot range over seq (variable of type "iter".Seq[In]) (typecheck)
if !yield(f(in)) {
return
}
}
}
}

// Concat returns an iterator over the concatenation of the sequences.
func Concat[V any](seqs ...iter.Seq[V]) iter.Seq[V] {
return func(yield func(V) bool) {
for _, seq := range seqs {
for e := range seq {

Check failure on line 20 in internal/common/iter/xiter.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

cannot range over seq (variable of type "iter".Seq[V]) (typecheck)
if !yield(e) {
return
}
}
}
}
}

func Empty[T any]() iter.Seq[T] {
return func(yield func(T) bool) {}
}
22 changes: 16 additions & 6 deletions internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/armadaproject/armada/internal/scheduler/adapters"
"github.com/armadaproject/armada/internal/scheduler/floatingresources"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/iter"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

Expand Down Expand Up @@ -495,13 +496,22 @@ func (txn *Txn) HasQueuedJobs(queue string) bool {
return queuedJobs.Len() > 0
}

// QueuedJobs returns true if the queue has any jobs in the running state or false otherwise
func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] {
func (txn *Txn) GetJobsForQueue(queue string) iter.Iterator[*Job] {

jobQueue, ok := txn.jobsByQueue[queue]
if ok {
return jobQueue.Iterator()
} else {
return emptyList.Iterator()

if !ok {
return iter.Empty[*Job]()
}

setIter := jobQueue.Iterator()
return func(yield func(*Job) bool) {
for !setIter.Done() {
val, _ := setIter.Next()
if !yield(val) {
return
}
}
}
}

Expand Down
13 changes: 4 additions & 9 deletions internal/scheduler/jobdb/jobdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package jobdb

import (
"math/rand"
"slices"
"sort"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -147,13 +147,8 @@ func TestJobDb_TestQueuedJobs(t *testing.T) {
err := txn.Upsert(jobs)
require.NoError(t, err)
collect := func() []*Job {
retrieved := make([]*Job, 0)
iter := txn.QueuedJobs(jobs[0].Queue())
for !iter.Done() {
j, _ := iter.Next()
retrieved = append(retrieved, j)
}
return retrieved
iter := txn.GetJobsForQueue(jobs[0].Queue())
return slices.Collect(iter)
}

assert.Equal(t, jobs, collect())
Expand Down Expand Up @@ -183,7 +178,7 @@ func TestJobDb_TestQueuedJobs(t *testing.T) {
// clear all jobs
err = txn.BatchDelete([]string{updatedJob.id, job10.id, jobs[0].id, jobs[2].id, jobs[6].id, jobs[9].id})
require.NoError(t, err)
assert.Equal(t, []*Job{}, collect())
assert.Equal(t, nil, collect())
}

func TestJobDb_TestGetAll(t *testing.T) {
Expand Down
162 changes: 0 additions & 162 deletions internal/scheduler/jobiteration.go

This file was deleted.

17 changes: 17 additions & 0 deletions internal/scheduler/jobiteration/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package jobiteration

import (
"github.com/armadaproject/armada/internal/common/iter"
"github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
)

type JobContextRepositoryAdapter struct {
JobRepository
}

func (j *JobContextRepositoryAdapter) GetJobContextsForQueue(queue string) JobContextIterator {
return iter.Map(func(j *jobdb.Job) *context.JobSchedulingContext {
return context.JobSchedulingContextFromJob(j)
}, j.JobRepository.GetJobsForQueue(queue))
}
44 changes: 44 additions & 0 deletions internal/scheduler/jobiteration/inmemory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package jobiteration

import (
"github.com/armadaproject/armada/internal/common/iter"
"github.com/armadaproject/armada/internal/scheduler/context"
"slices"
)

// InMemoryJobContextRepository is JobContextRepository that can be created from a slice of JobSchedulingContexts
type InMemoryJobContextRepository struct {
jobContextsByQueue map[string][]*context.JobSchedulingContext
}

func NewInMemoryJobContextRepository(jobContexts []*context.JobSchedulingContext) *InMemoryJobContextRepository {

jobContextsByQueue := make(map[string][]*context.JobSchedulingContext)

for _, jobCtx := range jobContexts {
queue, ok := jobContextsByQueue[jobCtx.Job.Queue()]
if !ok {
queue = []*context.JobSchedulingContext{}
}
jobContextsByQueue[jobCtx.Job.Queue()] = append(queue, jobCtx)
}

// Sort jobs by the order in which they should be scheduled.
for _, queue := range jobContextsByQueue {
slices.SortFunc(queue, func(a, b *context.JobSchedulingContext) int {
return a.Job.SchedulingOrderCompare(b.Job)
})
}

return &InMemoryJobContextRepository{
jobContextsByQueue: jobContextsByQueue,
}
}

func (repo *InMemoryJobContextRepository) GetJobContextsForQueue(queueName string) JobContextIterator {
queue, ok := repo.jobContextsByQueue[queueName]
if ok {
return slices.Values(queue)
}
return iter.Empty[*context.JobSchedulingContext]()
}
23 changes: 23 additions & 0 deletions internal/scheduler/jobiteration/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package jobiteration

import (
"github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/iter"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
)

type (
JobIterator = iter.Iterator[*jobdb.Job]
JobContextIterator = iter.Iterator[*context.JobSchedulingContext]
)

// JobRepository is a source of jobs
type JobRepository interface {
GetById(id string) *jobdb.Job
GetJobsForQueue(queue string) JobIterator
}

// JobContextRepository is a source of job contexts
type JobContextRepository interface {
GetJobContextsForQueue(queue string) JobContextIterator
}
Loading
Loading