Skip to content

Commit

Permalink
Merge branch 'master' into f/chrisma/string-uuid-inside-scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Sep 2, 2024
2 parents 86c7012 + 41b3f79 commit 5d69003
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 383 deletions.
8 changes: 1 addition & 7 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ func ShortSequenceString(sequence *armadaevents.EventSequence) string {

// ApiJobFromLogSubmitJob converts a SubmitJob log message into an api.Job struct, which is used by Armada internally.
func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, jobSetName string, time time.Time, e *armadaevents.SubmitJob) (*api.Job, error) {
jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId)
if err != nil {
err = errors.WithStack(err)
return nil, err
}

if e == nil || e.MainObject == nil || e.MainObject.Object == nil {
return nil, errors.Errorf("SubmitJob or one of its member pointers is nil")
}
Expand Down Expand Up @@ -146,7 +140,7 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
}

return &api.Job{
Id: jobId,
Id: e.JobIdStr,
ClientId: e.DeduplicationId,
Queue: queueName,
JobSetId: jobSetName,
Expand Down
21 changes: 9 additions & 12 deletions internal/eventingester/convert/conversions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/apache/pulsar-client-go/pulsar"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/common/armadacontext"
Expand All @@ -26,8 +25,6 @@ const (
)

var (
jobIdProto, _ = armadaevents.ProtoUuidFromUlidString(jobIdString)
runIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString))
baseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z")
baseTimeProto = protoutil.ToTimestamp(baseTime)
)
Expand All @@ -37,8 +34,8 @@ var jobRunSucceeded = &armadaevents.EventSequence_Event{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunSucceeded{
JobRunSucceeded: &armadaevents.JobRunSucceeded{
RunId: runIdProto,
JobId: jobIdProto,
RunIdStr: runIdString,
JobIdStr: jobIdString,
},
},
}
Expand All @@ -48,7 +45,7 @@ var cancelled = &armadaevents.EventSequence_Event{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_CancelledJob{
CancelledJob: &armadaevents.CancelledJob{
JobId: jobIdProto,
JobIdStr: jobIdString,
},
},
}
Expand Down Expand Up @@ -93,8 +90,8 @@ func TestCancelled(t *testing.T) {
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_CancelJob{
CancelJob: &armadaevents.CancelJob{
JobId: jobIdProto,
Reason: "some reason 1",
JobIdStr: jobIdString,
Reason: "some reason 1",
},
},
}, &armadaevents.EventSequence_Event{
Expand All @@ -108,8 +105,8 @@ func TestCancelled(t *testing.T) {
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_CancelledJob{
CancelledJob: &armadaevents.CancelledJob{
JobId: jobIdProto,
Reason: "some reason 3",
JobIdStr: jobIdString,
Reason: "some reason 3",
},
},
})
Expand All @@ -124,7 +121,7 @@ func TestCancelled(t *testing.T) {
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_CancelJob{
CancelJob: &armadaevents.CancelJob{
JobId: jobIdProto,
JobIdStr: jobIdString,
},
},
},
Expand All @@ -138,7 +135,7 @@ func TestCancelled(t *testing.T) {
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_CancelledJob{
CancelledJob: &armadaevents.CancelledJob{
JobId: jobIdProto,
JobIdStr: jobIdString,
},
},
},
Expand Down
72 changes: 0 additions & 72 deletions internal/scheduler/common.go

This file was deleted.

92 changes: 0 additions & 92 deletions internal/scheduler/common_test.go

This file was deleted.

62 changes: 62 additions & 0 deletions internal/scheduler/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/armadacontext"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
Expand Down Expand Up @@ -187,3 +191,61 @@ func JobSchedulingContextFromJob(job *jobdb.Job) *JobSchedulingContext {
GangInfo: gangInfo,
}
}

// PrintJobSummary logs a summary of the job scheduling context
// It will log a high level summary at Info level, and a list of all queues + jobs affected at debug level
func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSchedulingContext) {
if len(jctxs) == 0 {
return
}
jobsByQueue := armadaslices.MapAndGroupByFuncs(
jctxs,
func(jctx *JobSchedulingContext) string {
return jctx.Job.Queue()
},
func(jctx *JobSchedulingContext) *jobdb.Job {
return jctx.Job
},
)
resourcesByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) schedulerobjects.ResourceList {
rv := schedulerobjects.NewResourceListWithDefaultSize()
for _, job := range jobs {
rv.AddV1ResourceList(job.ResourceRequirements().Requests)
}
return rv
},
)
jobCountPerQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) int {
return len(jobs)
},
)
jobIdsByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) []string {
rv := make([]string, len(jobs))
for i, job := range jobs {
rv[i] = job.Id()
}
return rv
},
)
summary := fmt.Sprintf(
"affected queues %v; resources %v; jobs per queue %v",
maps.Keys(jobsByQueue),
armadamaps.MapValues(
resourcesByQueue,
func(rl schedulerobjects.ResourceList) string {
return rl.CompactString()
},
),
jobCountPerQueue,
)
verbose := fmt.Sprintf("affected jobs %v", jobIdsByQueue)

ctx.Infof("%s %s", prefix, summary)
ctx.Debugf("%s %s", prefix, verbose)
}
4 changes: 2 additions & 2 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
}
ctx.WithField("stage", "scheduling-algo").Infof("Finished unbinding preempted and evicted jobs")

PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
schedulercontext.PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
// TODO: Show failed jobs.

if sch.enableAssertions {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue/queue_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestFetch(t *testing.T) {
{Name: "testQueue2"},
},
},
"Immediate Steam Error": {
"Immediate Stream Error": {
queues: []*api.Queue{},
streamError: true,
},
Expand Down
Loading

0 comments on commit 5d69003

Please sign in to comment.