Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move code in common.go to more sensible places #3893

Merged
merged 4 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 0 additions & 72 deletions internal/scheduler/common.go

This file was deleted.

92 changes: 0 additions & 92 deletions internal/scheduler/common_test.go

This file was deleted.

62 changes: 62 additions & 0 deletions internal/scheduler/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/armadacontext"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
Expand Down Expand Up @@ -187,3 +191,61 @@ func JobSchedulingContextFromJob(job *jobdb.Job) *JobSchedulingContext {
GangInfo: gangInfo,
}
}

// PrintJobSummary logs a summary of the job scheduling context
// It will log a high level summary at Info level, and a list of all queues + jobs affected at debug level
func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSchedulingContext) {
if len(jctxs) == 0 {
return
}
jobsByQueue := armadaslices.MapAndGroupByFuncs(
jctxs,
func(jctx *JobSchedulingContext) string {
return jctx.Job.Queue()
},
func(jctx *JobSchedulingContext) *jobdb.Job {
return jctx.Job
},
)
resourcesByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) schedulerobjects.ResourceList {
rv := schedulerobjects.NewResourceListWithDefaultSize()
for _, job := range jobs {
rv.AddV1ResourceList(job.ResourceRequirements().Requests)
}
return rv
},
)
jobCountPerQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) int {
return len(jobs)
},
)
jobIdsByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) []string {
rv := make([]string, len(jobs))
for i, job := range jobs {
rv[i] = job.Id()
}
return rv
},
)
summary := fmt.Sprintf(
"affected queues %v; resources %v; jobs per queue %v",
maps.Keys(jobsByQueue),
armadamaps.MapValues(
resourcesByQueue,
func(rl schedulerobjects.ResourceList) string {
return rl.CompactString()
},
),
jobCountPerQueue,
)
verbose := fmt.Sprintf("affected jobs %v", jobIdsByQueue)

ctx.Infof("%s %s", prefix, summary)
ctx.Debugf("%s %s", prefix, verbose)
}
4 changes: 2 additions & 2 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
}
ctx.WithField("stage", "scheduling-algo").Infof("Finished unbinding preempted and evicted jobs")

PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
schedulercontext.PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
// TODO: Show failed jobs.

if sch.enableAssertions {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue/queue_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestFetch(t *testing.T) {
{Name: "testQueue2"},
},
},
"Immediate Steam Error": {
"Immediate Stream Error": {
queues: []*api.Queue{},
streamError: true,
},
Expand Down
82 changes: 82 additions & 0 deletions internal/scheduler/schedulerobjects/resourcelist_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package schedulerobjects

import (
"math"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -796,6 +797,87 @@ func TestV1ResourceListConversion(t *testing.T) {
assert.True(t, maps.Equal(v1rlCopy, v1rl))
}

func TestResourceListAsWeightedMillis(t *testing.T) {
tests := map[string]struct {
rl ResourceList
weights map[string]float64
expected int64
}{
"default": {
rl: ResourceList{
Resources: map[string]resource.Quantity{
"foo": resource.MustParse("2"),
"bar": resource.MustParse("10Gi"),
"baz": resource.MustParse("1"),
},
},
weights: map[string]float64{
"foo": 1,
"bar": 0.1,
"baz": 10,
},
expected: (1 * 2 * 1000) + (1 * 1000 * 1024 * 1024 * 1024) + (10 * 1 * 1000),
},
"zeroes": {
rl: ResourceList{
Resources: map[string]resource.Quantity{
"foo": resource.MustParse("0"),
"bar": resource.MustParse("1"),
"baz": resource.MustParse("2"),
},
},
weights: map[string]float64{
"foo": 1,
"bar": 0,
},
expected: 0,
},
"1Pi": {
rl: ResourceList{
Resources: map[string]resource.Quantity{
"foo": resource.MustParse("1Pi"),
},
},
weights: map[string]float64{
"foo": 1,
},
expected: int64(math.Pow(1024, 5)) * 1000,
},
"rounding": {
rl: ResourceList{
Resources: map[string]resource.Quantity{
"foo": resource.MustParse("1"),
},
},
weights: map[string]float64{
"foo": 0.3006,
},
expected: 301,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, tc.expected, tc.rl.AsWeightedMillis(tc.weights))
})
}
}

func BenchmarkResourceListAsWeightedMillis(b *testing.B) {
rl := NewResourceList(3)
rl.Set("cpu", resource.MustParse("2"))
rl.Set("memory", resource.MustParse("10Gi"))
rl.Set("nvidia.com/gpu", resource.MustParse("1"))
weights := map[string]float64{
"cpu": 1,
"memory": 0.1,
"nvidia.com/gpu": 10,
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
rl.AsWeightedMillis(weights)
}
}

func BenchmarkResourceListZeroAdd(b *testing.B) {
rla := NewResourceList(3)
rlb := NewResourceList(3)
Expand Down