Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove queue ttl #3649

Merged
merged 10 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/armada/submit/conversion/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ func SubmitJobFromApiRequest(
},
},
},
Objects: ingressesAndServices,
Scheduler: jobReq.Scheduler,
QueueTtlSeconds: jobReq.QueueTtlSeconds,
Objects: ingressesAndServices,
Scheduler: jobReq.Scheduler,
}
postProcess(msg, config)
return msg
Expand Down
1 change: 0 additions & 1 deletion internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
Created: time,
Owner: ownerId,
QueueOwnershipUserGroups: groups,
QueueTtlSeconds: e.QueueTtlSeconds,
}, nil
}

Expand Down
55 changes: 0 additions & 55 deletions internal/scheduler/adapters/adapters.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,16 @@
package adapters

import (
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
)

// PodRequirementsFromPod function creates the schedulerobjects and creates a value for the
// annotation field by supplying it with a cloned value of pod.Annotations
func PodRequirementsFromPod(pod *v1.Pod, priorityByPriorityClassName map[string]types.PriorityClass) *schedulerobjects.PodRequirements {
rv := PodRequirementsFromPodSpec(&pod.Spec, priorityByPriorityClassName)
rv.Annotations = maps.Clone(pod.Annotations)
return rv
}

// PodRequirementsFromPodSpec function returns *schedulerobjects.PodRequirements for podSpec.
// An error is logged if the podSpec uses an unknown priority class.
// This function may mutate podSpec.
Expand All @@ -48,49 +36,6 @@ func PodRequirementsFromPodSpec(podSpec *v1.PodSpec, priorityByPriorityClassName
}
}

// SchedulingInfoFromSubmitJob returns a minimal representation of a job containing only the info needed by the scheduler.
func SchedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob, submitTime time.Time, priorityClasses map[string]types.PriorityClass) (*schedulerobjects.JobSchedulingInfo, error) {
// Component common to all jobs.
schedulingInfo := &schedulerobjects.JobSchedulingInfo{
Lifetime: submitJob.Lifetime,
AtMostOnce: submitJob.AtMostOnce,
Preemptible: submitJob.Preemptible,
ConcurrencySafe: submitJob.ConcurrencySafe,
SubmitTime: submitTime,
Priority: submitJob.Priority,
Version: 0,
QueueTtlSeconds: submitJob.QueueTtlSeconds,
}

// Scheduling requirements specific to the objects that make up this job.
switch object := submitJob.MainObject.Object.(type) {
case *armadaevents.KubernetesMainObject_PodSpec:
podSpec := object.PodSpec.PodSpec
schedulingInfo.PriorityClassName = podSpec.PriorityClassName
podRequirements := PodRequirementsFromPodSpec(podSpec, priorityClasses)
if submitJob.ObjectMeta != nil {
podRequirements.Annotations = maps.Clone(submitJob.ObjectMeta.Annotations)
}
if submitJob.MainObject.ObjectMeta != nil {
if podRequirements.Annotations == nil {
podRequirements.Annotations = make(map[string]string, len(submitJob.MainObject.ObjectMeta.Annotations))
}
maps.Copy(podRequirements.Annotations, submitJob.MainObject.ObjectMeta.Annotations)
}
schedulingInfo.ObjectRequirements = append(
schedulingInfo.ObjectRequirements,
&schedulerobjects.ObjectRequirements{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
PodRequirements: podRequirements,
},
},
)
default:
return nil, errors.Errorf("unsupported object type %T", object)
}
return schedulingInfo, nil
}

// PriorityFromPodSpec returns the priority in a pod spec.
// If priority is set directly, that value is returned.
// Otherwise, it returns the value of the key podSpec.
Expand Down
37 changes: 0 additions & 37 deletions internal/scheduler/adapters/adapters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand Down Expand Up @@ -165,41 +163,6 @@ func TestPodRequirementsFromPodSpecPreemptionPolicy(t *testing.T) {
}
}

func TestPodRequirementsFromPod(t *testing.T) {
podSpec := &v1.PodSpec{
Priority: &priority,
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceName("cpu"): *resource.NewMilliQuantity(5300, resource.DecimalSI),
v1.ResourceName("memory"): *resource.NewQuantity(5*1024*1024*1024, resource.BinarySI),
},
Requests: v1.ResourceList{
v1.ResourceName("cpu"): *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceName("memory"): *resource.NewQuantity(2*1024*1024*1024, resource.BinarySI),
},
},
},
},
}
pod := v1.Pod{
Spec: *podSpec,
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
configuration.GangIdAnnotation: "gang-id",
configuration.GangCardinalityAnnotation: "1",
},
},
}
rv := PodRequirementsFromPod(&pod, priorityByPriorityClassName)
rv.Annotations["something"] = "test"
// Ensures that any modification made to the returned value of PodRequirementsFromPod function, "rv", does not
// affect the original pod definition. This assertion checks if the length of "pod.Annotation" is altered
// in view of the modification made to "rv" above.
assert.Len(t, pod.Annotations, 2)
}

func TestPriorityFromPodSpec(t *testing.T) {
tests := map[string]struct {
podSpec *v1.PodSpec
Expand Down
49 changes: 0 additions & 49 deletions internal/scheduler/jobdb/comparison.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package jobdb

import (
"time"
)

type (
JobPriorityComparer struct{}
JobQueueTtlComparer struct{}
JobIdHasher struct{}
)

Expand All @@ -22,50 +17,6 @@ func (JobIdHasher) Equal(a, b *Job) bool {
return a == b
}

// Compare jobs by their remaining queue time before expiry
// Invariants:
// - Job.queueTtl must be > 0
// - Job.created must be < `t`
func (j JobQueueTtlComparer) Compare(a, b *Job) int {
// Jobs with equal id are always considered equal.
// This ensures at most one job with a particular id can exist in the jobDb.
if a.id == b.id {
return 0
}

// TODO: Calling time.Now() here doesn't sound right. We should probably sort by earliest expiry time.
timeSeconds := time.Now().UTC().Unix()
aDuration := timeSeconds - (a.submittedTime / 1_000_000_000)
bDuration := timeSeconds - (b.submittedTime / 1_000_000_000)

aRemaining := max(0, a.QueueTtlSeconds()-aDuration)
bRemaining := max(0, b.QueueTtlSeconds()-bDuration)

// If jobs have different ttl remaining, they are ordered by remaining queue ttl - the smallest ttl first.
if aRemaining != bRemaining {
if aRemaining < bRemaining {
return -1
} else {
return 1
}
}

// Tie-break by logical creation timestamp.
if a.id < b.id {
return -1
} else if a.id > b.id {
return 1
}
panic("We should never get here. Since we check for job id equality at the top of this function.")
}

func max(x, y int64) int64 {
if x < y {
return y
}
return x
}

func (JobPriorityComparer) Compare(job, other *Job) int {
return SchedulingOrderCompare(job, other)
}
Expand Down
28 changes: 0 additions & 28 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,12 +513,6 @@ func (job *Job) EfficientResourceRequirements() internaltypes.ResourceList {
return job.resourceRequirements
}

// QueueTtlSeconds returns the time in seconds that the job should remain queued
// 0 means that this field is unset
func (job *Job) QueueTtlSeconds() int64 {
return job.jobSchedulingInfo.QueueTtlSeconds
}

// PodRequirements returns the pod requirements of the Job
func (job *Job) PodRequirements() *schedulerobjects.PodRequirements {
return job.jobSchedulingInfo.GetPodRequirements()
Expand Down Expand Up @@ -738,28 +732,6 @@ func (job *Job) RunById(id uuid.UUID) *JobRun {
return job.runsById[id]
}

// HasQueueTtlExpired returns true if the given job has reached its queueTtl expiry.
// Invariants:
// - job.created < `t`
func (job *Job) HasQueueTtlExpired() bool {
ttlSeconds := job.QueueTtlSeconds()
if ttlSeconds > 0 {
timeSeconds := time.Now().UTC().Unix()

// job.Created is populated from the `Submitted` field in postgres, which is a UnixNano time hence the conversion.
createdSeconds := job.submittedTime / 1_000_000_000
duration := timeSeconds - createdSeconds
return duration > ttlSeconds
} else {
return false
}
}

// HasQueueTtlSet returns true if the given job has a queueTtl set.
func (job *Job) HasQueueTtlSet() bool {
return job.QueueTtlSeconds() > 0
}

// WithJobset returns a copy of the job with the jobSet updated.
func (job *Job) WithJobset(jobset string) *Job {
j := copyJob(*job)
Expand Down
35 changes: 2 additions & 33 deletions internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@ import (
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

var (
emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{})
emptyQueuedJobsByTtl = immutable.NewSortedSet[*Job](JobQueueTtlComparer{})
)
var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{})

type JobDb struct {
jobsById *immutable.Map[string, *Job]
jobsByRunId *immutable.Map[uuid.UUID, string]
jobsByQueue map[string]immutable.SortedSet[*Job]
queuedJobsByTtl *immutable.SortedSet[*Job]
unvalidatedJobs *immutable.Set[*Job]
// Configured priority classes.
priorityClasses map[string]types.PriorityClass
Expand Down Expand Up @@ -88,7 +84,6 @@ func NewJobDbWithSchedulingKeyGenerator(
jobsByRunId: immutable.NewMap[uuid.UUID, string](&UUIDHasher{}),
jobsByQueue: map[string]immutable.SortedSet[*Job]{},
unvalidatedJobs: &unvalidatedJobs,
queuedJobsByTtl: &emptyQueuedJobsByTtl,
priorityClasses: priorityClasses,
defaultPriorityClass: defaultPriorityClass,
schedulingKeyGenerator: skg,
Expand All @@ -113,7 +108,6 @@ func (jobDb *JobDb) Clone() *JobDb {
jobsById: jobDb.jobsById,
jobsByRunId: jobDb.jobsByRunId,
jobsByQueue: maps.Clone(jobDb.jobsByQueue),
queuedJobsByTtl: jobDb.queuedJobsByTtl,
unvalidatedJobs: jobDb.unvalidatedJobs,
priorityClasses: jobDb.priorityClasses,
defaultPriorityClass: jobDb.defaultPriorityClass,
Expand Down Expand Up @@ -212,7 +206,6 @@ func (jobDb *JobDb) ReadTxn() *Txn {
jobsById: jobDb.jobsById,
jobsByRunId: jobDb.jobsByRunId,
jobsByQueue: jobDb.jobsByQueue,
queuedJobsByTtl: jobDb.queuedJobsByTtl,
unvalidatedJobs: jobDb.unvalidatedJobs,
active: true,
jobDb: jobDb,
Expand All @@ -231,7 +224,6 @@ func (jobDb *JobDb) WriteTxn() *Txn {
jobsById: jobDb.jobsById,
jobsByRunId: jobDb.jobsByRunId,
jobsByQueue: maps.Clone(jobDb.jobsByQueue),
queuedJobsByTtl: jobDb.queuedJobsByTtl,
unvalidatedJobs: jobDb.unvalidatedJobs,
active: true,
jobDb: jobDb,
Expand All @@ -251,9 +243,6 @@ type Txn struct {
jobsByRunId *immutable.Map[uuid.UUID, string]
// Queued jobs for each queue. Stored in the order in which they should be scheduled.
jobsByQueue map[string]immutable.SortedSet[*Job]
// Queued jobs for each queue ordered by remaining time-to-live.
// TODO: The ordering is wrong. Since we call time.Now() in the compare function.
queuedJobsByTtl *immutable.SortedSet[*Job]
// Jobs that require submit checking
unvalidatedJobs *immutable.Set[*Job]
// The jobDb from which this transaction was created.
Expand All @@ -272,7 +261,6 @@ func (txn *Txn) Commit() {
txn.jobDb.jobsById = txn.jobsById
txn.jobDb.jobsByRunId = txn.jobsByRunId
txn.jobDb.jobsByQueue = txn.jobsByQueue
txn.jobDb.queuedJobsByTtl = txn.queuedJobsByTtl
txn.jobDb.unvalidatedJobs = txn.unvalidatedJobs

txn.active = false
Expand Down Expand Up @@ -390,9 +378,6 @@ func (txn *Txn) Upsert(jobs []*Job) error {
if ok {
txn.jobsByQueue[existingJob.queue] = existingQueue.Delete(existingJob)
}
newQueuedJobsByTtl := txn.queuedJobsByTtl.Delete(existingJob)
txn.queuedJobsByTtl = &newQueuedJobsByTtl

if !existingJob.Validated() {
newUnvalidatedJobs := txn.unvalidatedJobs.Delete(existingJob)
txn.unvalidatedJobs = &newUnvalidatedJobs
Expand Down Expand Up @@ -454,11 +439,6 @@ func (txn *Txn) Upsert(jobs []*Job) error {
}
newQueue = newQueue.Add(job)
txn.jobsByQueue[job.queue] = newQueue

if job.HasQueueTtlSet() {
queuedJobsByTtl := txn.queuedJobsByTtl.Add(job)
txn.queuedJobsByTtl = &queuedJobsByTtl
}
}
}
}()
Expand Down Expand Up @@ -511,12 +491,7 @@ func (txn *Txn) QueuedJobs(queue string) *immutable.SortedSetIterator[*Job] {
}
}

// QueuedJobsByTtl returns an iterator for jobs ordered by queue ttl time - the closest to expiry first
func (txn *Txn) QueuedJobsByTtl() *immutable.SortedSetIterator[*Job] {
return txn.queuedJobsByTtl.Iterator()
}

// UnvalidatedJobs returns an iterator for jobs ordered by queue ttl time - the closest to expiry first
// UnvalidatedJobs returns an iterator for jobs that have not yet been validated
func (txn *Txn) UnvalidatedJobs() *immutable.SetIterator[*Job] {
return txn.unvalidatedJobs.Iterator()
}
Expand Down Expand Up @@ -558,12 +533,6 @@ func (txn *Txn) delete(jobId string) {
newQueue := queue.Delete(job)
txn.jobsByQueue[job.queue] = newQueue
}

// We only add these jobs into the collection if it has a queueTtl set, hence only remove if this is set.
if job.HasQueueTtlSet() {
newQueuedJobsByExpiry := txn.queuedJobsByTtl.Delete(job)
txn.queuedJobsByTtl = &newQueuedJobsByExpiry
}
newUnvalidatedJobs := txn.unvalidatedJobs.Delete(job)
txn.unvalidatedJobs = &newUnvalidatedJobs
}
Expand Down
Loading