From 7f00534351e90fb18074d99d7ec4b81281ec300c Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sat, 17 Aug 2024 10:11:19 +0100 Subject: [PATCH] improvements Signed-off-by: Chris Martin --- internal/scheduler/simulator/simulator.go | 7 +- .../configs/basicSchedulingConfig.yaml | 18 ++- internal/scheduler/simulator/writer.go | 152 +++++++----------- 3 files changed, 75 insertions(+), 102 deletions(-) diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 7f93bd93dc4..dd78cc08bae 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -154,6 +154,7 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli enableFastForward: enableFastForward, hardTerminationMinutes: hardTerminationMinutes, schedulerCyclePeriodSeconds: schedulerCyclePeriodSeconds, + time: time.Unix(0, 0).UTC(), } jobDb.SetClock(s) s.limiter.SetBurstAt(s.time, schedulingConfig.MaximumSchedulingBurst) @@ -185,7 +186,7 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error { s.pushScheduleEvent(s.time) simTerminationTime := s.time.Add(time.Minute * time.Duration(s.hardTerminationMinutes)) - + lastLogTime := s.time // Then run the scheduler until all jobs have completed. for s.eventLog.Len() > 0 { select { @@ -197,6 +198,10 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error { return err } } + if s.time.Unix()-lastLogTime.Unix() >= 60 { + ctx.Infof("Simulator time %s", s.time) + lastLogTime = s.time + } if s.time.After(simTerminationTime) { ctx.Infof("Current simulated time (%s) exceeds runtime deadline (%s). Terminating", s.time, simTerminationTime) return nil diff --git a/internal/scheduler/simulator/testdata/configs/basicSchedulingConfig.yaml b/internal/scheduler/simulator/testdata/configs/basicSchedulingConfig.yaml index 16eecf71d89..2787f82f10d 100644 --- a/internal/scheduler/simulator/testdata/configs/basicSchedulingConfig.yaml +++ b/internal/scheduler/simulator/testdata/configs/basicSchedulingConfig.yaml @@ -1,7 +1,13 @@ -maximumSchedulingRate: "+inf" -maximumSchedulingBurst: 9223372036854775807 -maximumPerQueueSchedulingRate: "+Inf" -maximumPerQueueSchedulingBurst: 9223372036854775807 +maxQueueLookback: 1000 +supportedResourceTypes: + - name: memory + resolution: "1" + - name: cpu + resolution: "1m" + - name: ephemeral-storage + resolution: "1" + - name: nvidia.com/gpu + resolution: "1" fairnessModel: "DominantResourceFairness" dominantResourceFairnessResourcesToConsider: - "cpu" @@ -27,3 +33,7 @@ priorityClasses: priority: 30000 preemptible: true defaultPriorityClassName: "armada-default" +maximumSchedulingRate: 9223372036854775807 +maximumSchedulingBurst: 9223372036854775807 +maximumPerQueueSchedulingRate: 9223372036854775807 +maximumPerQueueSchedulingBurst: 9223372036854775807 diff --git a/internal/scheduler/simulator/writer.go b/internal/scheduler/simulator/writer.go index c607af78849..6b903309582 100644 --- a/internal/scheduler/simulator/writer.go +++ b/internal/scheduler/simulator/writer.go @@ -1,11 +1,8 @@ package simulator import ( - "io" - "time" - - "github.com/pkg/errors" parquetWriter "github.com/xitongsys/parquet-go/writer" + "io" v1 "k8s.io/api/core/v1" "github.com/armadaproject/armada/internal/common/armadacontext" @@ -13,127 +10,90 @@ import ( "github.com/armadaproject/armada/pkg/armadaevents" ) -const ( - unsupportedEvent = iota - submitJob = iota - jobRunLeased = iota - jobRunRunning = iota - jobSucceeded = iota - jobRunPreempted = iota - jobCancelled = iota -) - type Writer struct { - c <-chan StateTransition - writer io.Writer - prevSeenEventByJobId map[string]*armadaevents.EventSequence_Event + c <-chan StateTransition + writer io.Writer } -type FlattenedArmadaEvent struct { - Time int64 `parquet:"name=elapsed_time, type=INT64"` - Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - JobSet string `parquet:"name=job_set, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - JobId string `parquet:"name=job_id, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - RunIndex int `parquet:"name=run_index, type=INT32"` - NumRuns int `parquet:"name=num_runs, type=INT32"` - PriorityClass string `parquet:"name=priority_class, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - PreviousEventType int `parquet:"name=previous_event_type, type=INT32"` - EventType int `parquet:"name=event_type, type=INT32"` - SecondsSinceLastEvent float64 `parquet:"name=seconds_since_last_event, type=DOUBLE"` - Cpu float64 `parquet:"name=cpu, type=DOUBLE"` - Memory float64 `parquet:"name=memory, type=DOUBLE"` - Gpu float64 `parquet:"name=gpu, type=DOUBLE"` - EphemeralStorage float64 `parquet:"name=ephemeral_storage, type=DOUBLE"` - ExitCode int `parquet:"name=exit_code, type=INT32"` +type JobRunRow struct { + Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + JobSet string `parquet:"name=job_set, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + JobId string `parquet:"name=job_id, type=BYTE_ARRAY, convertedtype=UTF8"` + RunId string `parquet:"name=run_id, type=BYTE_ARRAY, convertedtype=UTF8"` + PriorityClass string `parquet:"name=priority_class, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + Cpu float64 `parquet:"name=cpu, type=DOUBLE"` + Memory float64 `parquet:"name=memory, type=DOUBLE"` + Gpu float64 `parquet:"name=gpu, type=DOUBLE"` + EphemeralStorage float64 `parquet:"name=ephemeral_storage, type=DOUBLE"` + ExitCode int `parquet:"name=exit_code, type=INT32"` + State string `parquet:"name=state, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + SubmittedTime int64 `parquet:"name=submitted_time, type=INT64"` + ScheduledTime int64 `parquet:"name=scheduled_time, type=INT64"` + FinishedTime int64 `parquet:"name=finished_time, type=INT64"` } func NewWriter(writer io.Writer, c <-chan StateTransition) (*Writer, error) { - if c == nil { - return nil, errors.Errorf("uninitialised channel passed into FileWriter") - } - fw := &Writer{ - c: c, - writer: writer, - prevSeenEventByJobId: make(map[string]*armadaevents.EventSequence_Event), + c: c, + writer: writer, } return fw, nil } // Presently only converts events supported in the simulator -func (w *Writer) encodeEvent(e *armadaevents.EventSequence_Event) int { +func (w *Writer) toEventState(e *armadaevents.EventSequence_Event) string { switch e.GetEvent().(type) { - case *armadaevents.EventSequence_Event_SubmitJob: - return submitJob - case *armadaevents.EventSequence_Event_JobRunLeased: - return jobRunLeased - case *armadaevents.EventSequence_Event_JobRunRunning: - return jobRunRunning case *armadaevents.EventSequence_Event_JobSucceeded: - return jobSucceeded + return "SUCCEEDED" case *armadaevents.EventSequence_Event_JobRunPreempted: - return jobRunPreempted + return "PREEMPTED" case *armadaevents.EventSequence_Event_CancelledJob: - return jobCancelled + return "CANCELLED" default: // Undefined event type - return unsupportedEvent + return "UNKNOWN" } } -func (w *Writer) flattenStateTransition(flattenedStateTransitions []*FlattenedArmadaEvent, st StateTransition) ([]*FlattenedArmadaEvent, error) { - startTime := time.Time{} - +func (w *Writer) createJobRunRow(st StateTransition) ([]*JobRunRow, error) { + rows := make([]*JobRunRow, 0, len(st.EventSequence.Events)) events := st.EventSequence jobsList := st.Jobs for i, event := range events.Events { // Assumes all supported events have an associated job associatedJob := jobsList[i] - prevSeenEvent := w.prevSeenEventByJobId[associatedJob.Id()] - // Resource requirements - cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] - memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] - ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] - gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] - - eventTime := protoutil.ToStdTime(event.Created) - prevEventType := 0 - prevEventTime := eventTime - if prevSeenEvent != nil { - prevEventType = w.encodeEvent(prevSeenEvent) - prevEventTime = protoutil.ToStdTime(prevSeenEvent.Created) - } - - flattenedStateTransitions = append(flattenedStateTransitions, &FlattenedArmadaEvent{ - Time: eventTime.Sub(startTime).Milliseconds(), - Queue: events.Queue, - JobSet: events.JobSetName, - JobId: associatedJob.Id(), - RunIndex: len(associatedJob.AllRuns()) - 1, // Assumed to be related to latest run in simulation - NumRuns: len(associatedJob.AllRuns()), - PriorityClass: associatedJob.PriorityClassName(), - PreviousEventType: prevEventType, - EventType: w.encodeEvent(event), - SecondsSinceLastEvent: eventTime.Sub(prevEventTime).Seconds(), - Cpu: cpuLimit.AsApproximateFloat64(), - Memory: memoryLimit.AsApproximateFloat64(), - Gpu: gpuLimit.AsApproximateFloat64(), - EphemeralStorage: ephemeralStorageLimit.AsApproximateFloat64(), - ExitCode: 0, - }) - w.prevSeenEventByJobId[associatedJob.Id()] = event - - if associatedJob.Succeeded() || associatedJob.Failed() || associatedJob.Cancelled() { - delete(w.prevSeenEventByJobId, associatedJob.Id()) + if event.GetCancelledJob() != nil || event.GetJobSucceeded() != nil || event.GetJobRunPreempted() != nil { + // Resource requirements + cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] + memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] + ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] + gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] + eventTime := protoutil.ToStdTime(event.Created) + + rows = append(rows, &JobRunRow{ + Queue: associatedJob.Queue(), + JobSet: associatedJob.Jobset(), + JobId: associatedJob.Id(), + RunId: associatedJob.LatestRun().Id().String(), + PriorityClass: associatedJob.PriorityClassName(), + Cpu: cpuLimit.AsApproximateFloat64(), + Memory: memoryLimit.AsApproximateFloat64(), + Gpu: gpuLimit.AsApproximateFloat64(), + EphemeralStorage: ephemeralStorageLimit.AsApproximateFloat64(), + ExitCode: 0, + State: w.toEventState(event), + SubmittedTime: associatedJob.SubmitTime().UnixNano() / 1000000000, + ScheduledTime: associatedJob.LatestRun().Created() / 1000000000, + FinishedTime: eventTime.UnixNano() / 1000000000, + }) } } - - return flattenedStateTransitions, nil + return rows, nil } func (w *Writer) Run(ctx *armadacontext.Context) (err error) { - pw, err := parquetWriter.NewParquetWriterFromWriter(w.writer, new(FlattenedArmadaEvent), 1) + pw, err := parquetWriter.NewParquetWriterFromWriter(w.writer, new(JobRunRow), 1) if err != nil { return } @@ -149,8 +109,6 @@ func (w *Writer) Run(ctx *armadacontext.Context) (err error) { } }() - flattenedStateTransitions := make([]*FlattenedArmadaEvent, 100) - for { select { case <-ctx.Done(): @@ -160,12 +118,12 @@ func (w *Writer) Run(ctx *armadacontext.Context) (err error) { return } - flattenedStateTransitions, err := w.flattenStateTransition(flattenedStateTransitions[0:0], stateTransitions) + jobRunRows, err := w.createJobRunRow(stateTransitions) if err != nil { return err } - for _, flattenedStateTransition := range flattenedStateTransitions { + for _, flattenedStateTransition := range jobRunRows { if err := pw.Write(*flattenedStateTransition); err != nil { return err }