Skip to content

Commit

Permalink
Merge branch 'master' into sendToGitHub/separate-job-spec-table
Browse files Browse the repository at this point in the history
  • Loading branch information
eleanorpratt committed Sep 23, 2024
2 parents 8edb84e + ac1f002 commit 497c300
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 104 deletions.
5 changes: 3 additions & 2 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,10 +651,11 @@ func (job *Job) ValidateResourceRequests() error {

// WithNewRun creates a copy of the job with a new run on the given executor.
func (job *Job) WithNewRun(executor, nodeId, nodeName, pool string, scheduledAtPriority int32) *Job {
now := job.jobDb.clock.Now()
return job.WithUpdatedRun(job.jobDb.CreateRun(
job.jobDb.uuidProvider.New(),
job.Id(),
job.jobDb.clock.Now().UnixNano(),
now.UnixNano(),
executor,
nodeId,
nodeName,
Expand All @@ -668,7 +669,7 @@ func (job *Job) WithNewRun(executor, nodeId, nodeName, pool string, scheduledAtP
false,
false,
false,
nil,
&now,
nil,
nil,
nil,
Expand Down
7 changes: 7 additions & 0 deletions internal/scheduler/jobdb/job_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package jobdb

import (
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
clock "k8s.io/utils/clock/testing"

"github.com/armadaproject/armada/internal/common/stringinterner"
"github.com/armadaproject/armada/internal/common/types"
Expand All @@ -30,6 +32,7 @@ var (
}
TestDefaultPriorityClass = PriorityClass3
SchedulingKeyGenerator = schedulerobjects.NewSchedulingKeyGeneratorWithKey(make([]byte, 32))
testClock = clock.NewFakeClock(time.Now())
jobDb = NewJobDbWithSchedulingKeyGenerator(
TestPriorityClasses,
TestDefaultPriorityClass,
Expand All @@ -41,6 +44,10 @@ var (
scheduledAtPriority = int32(5)
)

func init() {
jobDb.clock = testClock
}

var baseJobRun = jobDb.CreateRun(
uuid.New().String(),
uuid.NewString(),
Expand Down
4 changes: 3 additions & 1 deletion internal/scheduler/jobdb/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,20 @@ func TestJob_TestWithNewRun(t *testing.T) {
jobWithRun := baseJob.WithNewRun("test-executor", "test-nodeId", "nodeId", "pool", scheduledAtPriority)
assert.Equal(t, true, jobWithRun.HasRuns())
run := jobWithRun.LatestRun()
created := jobDb.clock.Now()
assert.NotNil(t, run)
assert.Equal(
t,
&JobRun{
id: run.id,
jobId: "test-job",
created: run.created,
created: created.UnixNano(),
executor: "test-executor",
nodeId: "test-nodeId",
nodeName: "nodeId",
pool: "pool",
scheduledAtPriority: &scheduledAtPriority,
leaseTime: &created,
},
run,
)
Expand Down
7 changes: 6 additions & 1 deletion internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type JobIterator interface {
Next() (*Job, bool)
Done() bool
}

var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{})

type JobDb struct {
Expand Down Expand Up @@ -496,7 +501,7 @@ func (txn *Txn) HasQueuedJobs(queue string) bool {
}

// QueuedJobs returns true if the queue has any jobs in the running state or false otherwise
func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] {
func (txn *Txn) QueuedJobs(queue string) JobIterator {
jobQueue, ok := txn.jobsByQueue[queue]
if ok {
return jobQueue.Iterator()
Expand Down
30 changes: 13 additions & 17 deletions internal/scheduler/jobiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"github.com/armadaproject/armada/internal/scheduler/jobdb"
)

type JobIterator interface {
type JobContextIterator interface {
Next() (*schedulercontext.JobSchedulingContext, error)
}

type JobRepository interface {
GetQueueJobIds(queueName string) []string
GetExistingJobsByIds(ids []string) []*jobdb.Job
QueuedJobs(queueName string) jobdb.JobIterator
GetById(id string) *jobdb.Job
}

type InMemoryJobIterator struct {
Expand Down Expand Up @@ -97,25 +97,22 @@ func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []*jobd
return rv
}

func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator {
func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterator {
repo.mu.Lock()
defer repo.mu.Unlock()
return NewInMemoryJobIterator(slices.Clone(repo.jctxsByQueue[queue]))
}

// QueuedJobsIterator is an iterator over all jobs in a queue.
type QueuedJobsIterator struct {
repo JobRepository
jobIds []string
idx int
ctx *armadacontext.Context
jobIter jobdb.JobIterator
ctx *armadacontext.Context
}

func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator {
return &QueuedJobsIterator{
jobIds: repo.GetQueueJobIds(queue),
repo: repo,
ctx: ctx,
jobIter: repo.QueuedJobs(queue),
ctx: ctx,
}
}

Expand All @@ -124,22 +121,21 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er
case <-it.ctx.Done():
return nil, it.ctx.Err()
default:
if it.idx >= len(it.jobIds) {
job, _ := it.jobIter.Next()
if job == nil {
return nil, nil
}
job := it.repo.GetExistingJobsByIds([]string{it.jobIds[it.idx]})
it.idx++
return schedulercontext.JobSchedulingContextFromJob(job[0]), nil
return schedulercontext.JobSchedulingContextFromJob(job), nil
}
}

// MultiJobsIterator chains several JobIterators together in the order provided.
type MultiJobsIterator struct {
i int
its []JobIterator
its []JobContextIterator
}

func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator {
func NewMultiJobsIterator(its ...JobContextIterator) *MultiJobsIterator {
return &MultiJobsIterator{
its: its,
}
Expand Down
62 changes: 30 additions & 32 deletions internal/scheduler/jobiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) {
}

ctx := armadacontext.Background()
its := make([]JobIterator, 3)
its := make([]JobContextIterator, 3)
for i, queue := range []string{"A", "B", "C"} {
it := NewQueuedJobsIterator(ctx, queue, repo)
its[i] = it
Expand Down Expand Up @@ -214,20 +214,43 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) {
assert.NoError(t, err)
}

// TODO: Deprecate in favour of InMemoryRepo.
type mockJobIterator struct {
jobs []*jobdb.Job
i int
}

func (iter *mockJobIterator) Done() bool {
return iter.i >= len(iter.jobs)
}

func (iter *mockJobIterator) Next() (*jobdb.Job, bool) {
if iter.Done() {
return nil, false
}
job := iter.jobs[iter.i]
iter.i++
return job, true
}

type mockJobRepository struct {
jobsByQueue map[string][]*jobdb.Job
jobsById map[string]*jobdb.Job
// Ids of all jobs hat were leased to an executor.
leasedJobs map[string]bool
getQueueJobIdsDelay time.Duration
}

func (repo *mockJobRepository) QueuedJobs(queueName string) jobdb.JobIterator {
q := repo.jobsByQueue[queueName]
return &mockJobIterator{jobs: q}
}

func (repo *mockJobRepository) GetById(id string) *jobdb.Job {
j, _ := repo.jobsById[id]
return j
}

func newMockJobRepository() *mockJobRepository {
return &mockJobRepository{
jobsByQueue: make(map[string][]*jobdb.Job),
jobsById: make(map[string]*jobdb.Job),
leasedJobs: make(map[string]bool),
}
}

Expand All @@ -242,35 +265,10 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) {
repo.jobsById[job.Id()] = job
}

func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator {
func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator {
return NewQueuedJobsIterator(ctx, queue, repo)
}

func (repo *mockJobRepository) GetQueueJobIds(queue string) []string {
time.Sleep(repo.getQueueJobIdsDelay)
if jobs, ok := repo.jobsByQueue[queue]; ok {
rv := make([]string, 0, len(jobs))
for _, job := range jobs {
if !repo.leasedJobs[job.Id()] {
rv = append(rv, job.Id())
}
}
return rv
} else {
return make([]string, 0)
}
}

func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job {
rv := make([]*jobdb.Job, len(jobIds))
for i, jobId := range jobIds {
if job, ok := repo.jobsById[jobId]; ok {
rv[i] = job
}
}
return rv
}

func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job {
return testfixtures.TestJob(queue, util.ULID(), "armada-default", req)
}
10 changes: 9 additions & 1 deletion internal/scheduler/metrics/state_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,22 @@ func (m *jobStateMetrics) collect(ch chan<- prometheus.Metric) {
}
}

// ReportJobLeased reports the job as being leasedJob. This has to be reported separately because the state transition
// ReportJobLeased reports the job as being leased. This has to be reported separately because the state transition
// logic does work for job leased!
func (m *jobStateMetrics) ReportJobLeased(job *jobdb.Job) {
run := job.LatestRun()
duration, priorState := stateDuration(job, run, run.LeaseTime())
m.updateStateDuration(job, leased, priorState, duration)
}

// ReportJobPreempted reports the job as being preempted. This has to be reported separately because the state transition
// logic does work for job preempted!
func (m *jobStateMetrics) ReportJobPreempted(job *jobdb.Job) {
run := job.LatestRun()
duration, priorState := stateDuration(job, run, run.PreemptedTime())
m.updateStateDuration(job, preempted, priorState, duration)
}

func (m *jobStateMetrics) ReportStateTransitions(
jsts []jobdb.JobStateTransitions,
jobRunErrorsByRunId map[string]*armadaevents.Error,
Expand Down
11 changes: 7 additions & 4 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
}

func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) {
jobIteratorByQueue := make(map[string]JobIterator)
jobIteratorByQueue := make(map[string]JobContextIterator)
for _, qctx := range sch.schedulingContext.QueueSchedulingContexts {
evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue)
if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() {
Expand Down Expand Up @@ -821,13 +821,16 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev
if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) {
continue
}
jobIds := make([]string, 0, len(node.AllocatedByJobId))
jobs := make([]*jobdb.Job, 0, len(node.AllocatedByJobId))
for jobId := range node.AllocatedByJobId {
if _, ok := node.EvictedJobRunIds[jobId]; !ok {
jobIds = append(jobIds, jobId)
job := evi.jobRepo.GetById(jobId)
if job != nil {
jobs = append(jobs, job)
}

}
}
jobs := evi.jobRepo.GetExistingJobsByIds(jobIds)
evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(jobFilter, jobs, node)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestEvictOversubscribed(t *testing.T) {
require.NoError(t, err)

evictor := NewOversubscribedEvictor(
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb)
result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn)
require.NoError(t, err)
Expand Down Expand Up @@ -1862,7 +1862,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nodeIdByJobId,
jobIdsByGangId,
Expand Down Expand Up @@ -2209,7 +2209,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nil,
nil,
Expand Down Expand Up @@ -2268,7 +2268,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
constraints,
testfixtures.TestEmptyFloatingResources,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
jobDbTxn,
nodeDb,
nil,
nil,
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewQueueScheduler(
constraints schedulerconstraints.SchedulingConstraints,
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
jobIteratorByQueue map[string]JobIterator,
jobIteratorByQueue map[string]JobContextIterator,
) (*QueueScheduler, error) {
for queue := range jobIteratorByQueue {
if _, ok := sctx.QueueSchedulingContexts[queue]; !ok {
Expand Down Expand Up @@ -219,7 +219,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresul
// Jobs without gangIdAnnotation are considered gangs of cardinality 1.
type QueuedGangIterator struct {
schedulingContext *schedulercontext.SchedulingContext
queuedJobsIterator JobIterator
queuedJobsIterator JobContextIterator
// Groups jctxs by the gang they belong to.
jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext
// Maximum number of jobs to look at before giving up.
Expand All @@ -231,7 +231,7 @@ type QueuedGangIterator struct {
next *schedulercontext.GangSchedulingContext
}

func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
return &QueuedGangIterator{
schedulingContext: sctx,
queuedJobsIterator: it,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestQueueScheduler(t *testing.T) {
require.NoError(t, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, tc.Queues, map[string]bool{})
jobIteratorByQueue := make(map[string]JobIterator)
jobIteratorByQueue := make(map[string]JobContextIterator)
for _, q := range tc.Queues {
it := jobRepo.GetJobIterator(q.Name)
jobIteratorByQueue[q.Name] = it
Expand Down
Loading

0 comments on commit 497c300

Please sign in to comment.