Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 committed Aug 17, 2024
1 parent 6923240 commit 7f00534
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 102 deletions.
7 changes: 6 additions & 1 deletion internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
enableFastForward: enableFastForward,
hardTerminationMinutes: hardTerminationMinutes,
schedulerCyclePeriodSeconds: schedulerCyclePeriodSeconds,
time: time.Unix(0, 0).UTC(),
}
jobDb.SetClock(s)
s.limiter.SetBurstAt(s.time, schedulingConfig.MaximumSchedulingBurst)
Expand Down Expand Up @@ -185,7 +186,7 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error {
s.pushScheduleEvent(s.time)

simTerminationTime := s.time.Add(time.Minute * time.Duration(s.hardTerminationMinutes))

lastLogTime := s.time
// Then run the scheduler until all jobs have completed.
for s.eventLog.Len() > 0 {
select {
Expand All @@ -197,6 +198,10 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error {
return err
}
}
if s.time.Unix()-lastLogTime.Unix() >= 60 {
ctx.Infof("Simulator time %s", s.time)
lastLogTime = s.time
}
if s.time.After(simTerminationTime) {
ctx.Infof("Current simulated time (%s) exceeds runtime deadline (%s). Terminating", s.time, simTerminationTime)
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
maximumSchedulingRate: "+inf"
maximumSchedulingBurst: 9223372036854775807
maximumPerQueueSchedulingRate: "+Inf"
maximumPerQueueSchedulingBurst: 9223372036854775807
maxQueueLookback: 1000
supportedResourceTypes:
- name: memory
resolution: "1"
- name: cpu
resolution: "1m"
- name: ephemeral-storage
resolution: "1"
- name: nvidia.com/gpu
resolution: "1"
fairnessModel: "DominantResourceFairness"
dominantResourceFairnessResourcesToConsider:
- "cpu"
Expand All @@ -27,3 +33,7 @@ priorityClasses:
priority: 30000
preemptible: true
defaultPriorityClassName: "armada-default"
maximumSchedulingRate: 9223372036854775807
maximumSchedulingBurst: 9223372036854775807
maximumPerQueueSchedulingRate: 9223372036854775807
maximumPerQueueSchedulingBurst: 9223372036854775807
152 changes: 55 additions & 97 deletions internal/scheduler/simulator/writer.go
Original file line number Diff line number Diff line change
@@ -1,139 +1,99 @@
package simulator

import (

Check failure on line 3 in internal/scheduler/simulator/writer.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

File is not `gofumpt`-ed (gofumpt)
"io"
"time"

"github.com/pkg/errors"
parquetWriter "github.com/xitongsys/parquet-go/writer"

Check failure on line 4 in internal/scheduler/simulator/writer.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

File is not `goimports`-ed with -local github.com/armadaproject/armada (goimports)
"io"

Check failure on line 5 in internal/scheduler/simulator/writer.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

File is not `gofumpt`-ed (gofumpt)
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/armadacontext"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/pkg/armadaevents"
)

const (
unsupportedEvent = iota
submitJob = iota
jobRunLeased = iota
jobRunRunning = iota
jobSucceeded = iota
jobRunPreempted = iota
jobCancelled = iota
)

type Writer struct {
c <-chan StateTransition
writer io.Writer
prevSeenEventByJobId map[string]*armadaevents.EventSequence_Event
c <-chan StateTransition
writer io.Writer
}

type FlattenedArmadaEvent struct {
Time int64 `parquet:"name=elapsed_time, type=INT64"`
Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
JobSet string `parquet:"name=job_set, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
JobId string `parquet:"name=job_id, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
RunIndex int `parquet:"name=run_index, type=INT32"`
NumRuns int `parquet:"name=num_runs, type=INT32"`
PriorityClass string `parquet:"name=priority_class, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
PreviousEventType int `parquet:"name=previous_event_type, type=INT32"`
EventType int `parquet:"name=event_type, type=INT32"`
SecondsSinceLastEvent float64 `parquet:"name=seconds_since_last_event, type=DOUBLE"`
Cpu float64 `parquet:"name=cpu, type=DOUBLE"`
Memory float64 `parquet:"name=memory, type=DOUBLE"`
Gpu float64 `parquet:"name=gpu, type=DOUBLE"`
EphemeralStorage float64 `parquet:"name=ephemeral_storage, type=DOUBLE"`
ExitCode int `parquet:"name=exit_code, type=INT32"`
type JobRunRow struct {
Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
JobSet string `parquet:"name=job_set, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
JobId string `parquet:"name=job_id, type=BYTE_ARRAY, convertedtype=UTF8"`
RunId string `parquet:"name=run_id, type=BYTE_ARRAY, convertedtype=UTF8"`
PriorityClass string `parquet:"name=priority_class, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Cpu float64 `parquet:"name=cpu, type=DOUBLE"`
Memory float64 `parquet:"name=memory, type=DOUBLE"`
Gpu float64 `parquet:"name=gpu, type=DOUBLE"`
EphemeralStorage float64 `parquet:"name=ephemeral_storage, type=DOUBLE"`
ExitCode int `parquet:"name=exit_code, type=INT32"`
State string `parquet:"name=state, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
SubmittedTime int64 `parquet:"name=submitted_time, type=INT64"`
ScheduledTime int64 `parquet:"name=scheduled_time, type=INT64"`
FinishedTime int64 `parquet:"name=finished_time, type=INT64"`
}

func NewWriter(writer io.Writer, c <-chan StateTransition) (*Writer, error) {
if c == nil {
return nil, errors.Errorf("uninitialised channel passed into FileWriter")
}

fw := &Writer{
c: c,
writer: writer,
prevSeenEventByJobId: make(map[string]*armadaevents.EventSequence_Event),
c: c,
writer: writer,
}

return fw, nil
}

// Presently only converts events supported in the simulator
func (w *Writer) encodeEvent(e *armadaevents.EventSequence_Event) int {
func (w *Writer) toEventState(e *armadaevents.EventSequence_Event) string {
switch e.GetEvent().(type) {
case *armadaevents.EventSequence_Event_SubmitJob:
return submitJob
case *armadaevents.EventSequence_Event_JobRunLeased:
return jobRunLeased
case *armadaevents.EventSequence_Event_JobRunRunning:
return jobRunRunning
case *armadaevents.EventSequence_Event_JobSucceeded:
return jobSucceeded
return "SUCCEEDED"
case *armadaevents.EventSequence_Event_JobRunPreempted:
return jobRunPreempted
return "PREEMPTED"
case *armadaevents.EventSequence_Event_CancelledJob:
return jobCancelled
return "CANCELLED"
default:
// Undefined event type
return unsupportedEvent
return "UNKNOWN"
}
}

func (w *Writer) flattenStateTransition(flattenedStateTransitions []*FlattenedArmadaEvent, st StateTransition) ([]*FlattenedArmadaEvent, error) {
startTime := time.Time{}

func (w *Writer) createJobRunRow(st StateTransition) ([]*JobRunRow, error) {
rows := make([]*JobRunRow, 0, len(st.EventSequence.Events))
events := st.EventSequence
jobsList := st.Jobs
for i, event := range events.Events {
// Assumes all supported events have an associated job
associatedJob := jobsList[i]
prevSeenEvent := w.prevSeenEventByJobId[associatedJob.Id()]
// Resource requirements
cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU]
memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory]
ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage]
gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"]

eventTime := protoutil.ToStdTime(event.Created)
prevEventType := 0
prevEventTime := eventTime
if prevSeenEvent != nil {
prevEventType = w.encodeEvent(prevSeenEvent)
prevEventTime = protoutil.ToStdTime(prevSeenEvent.Created)
}

flattenedStateTransitions = append(flattenedStateTransitions, &FlattenedArmadaEvent{
Time: eventTime.Sub(startTime).Milliseconds(),
Queue: events.Queue,
JobSet: events.JobSetName,
JobId: associatedJob.Id(),
RunIndex: len(associatedJob.AllRuns()) - 1, // Assumed to be related to latest run in simulation
NumRuns: len(associatedJob.AllRuns()),
PriorityClass: associatedJob.PriorityClassName(),
PreviousEventType: prevEventType,
EventType: w.encodeEvent(event),
SecondsSinceLastEvent: eventTime.Sub(prevEventTime).Seconds(),
Cpu: cpuLimit.AsApproximateFloat64(),
Memory: memoryLimit.AsApproximateFloat64(),
Gpu: gpuLimit.AsApproximateFloat64(),
EphemeralStorage: ephemeralStorageLimit.AsApproximateFloat64(),
ExitCode: 0,
})
w.prevSeenEventByJobId[associatedJob.Id()] = event

if associatedJob.Succeeded() || associatedJob.Failed() || associatedJob.Cancelled() {
delete(w.prevSeenEventByJobId, associatedJob.Id())
if event.GetCancelledJob() != nil || event.GetJobSucceeded() != nil || event.GetJobRunPreempted() != nil {
// Resource requirements
cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU]
memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory]
ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage]
gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"]
eventTime := protoutil.ToStdTime(event.Created)

rows = append(rows, &JobRunRow{
Queue: associatedJob.Queue(),
JobSet: associatedJob.Jobset(),
JobId: associatedJob.Id(),
RunId: associatedJob.LatestRun().Id().String(),
PriorityClass: associatedJob.PriorityClassName(),
Cpu: cpuLimit.AsApproximateFloat64(),
Memory: memoryLimit.AsApproximateFloat64(),
Gpu: gpuLimit.AsApproximateFloat64(),
EphemeralStorage: ephemeralStorageLimit.AsApproximateFloat64(),
ExitCode: 0,
State: w.toEventState(event),
SubmittedTime: associatedJob.SubmitTime().UnixNano() / 1000000000,
ScheduledTime: associatedJob.LatestRun().Created() / 1000000000,
FinishedTime: eventTime.UnixNano() / 1000000000,
})
}
}

return flattenedStateTransitions, nil
return rows, nil
}

func (w *Writer) Run(ctx *armadacontext.Context) (err error) {
pw, err := parquetWriter.NewParquetWriterFromWriter(w.writer, new(FlattenedArmadaEvent), 1)
pw, err := parquetWriter.NewParquetWriterFromWriter(w.writer, new(JobRunRow), 1)
if err != nil {
return
}
Expand All @@ -149,8 +109,6 @@ func (w *Writer) Run(ctx *armadacontext.Context) (err error) {
}
}()

flattenedStateTransitions := make([]*FlattenedArmadaEvent, 100)

for {
select {
case <-ctx.Done():
Expand All @@ -160,12 +118,12 @@ func (w *Writer) Run(ctx *armadacontext.Context) (err error) {
return
}

flattenedStateTransitions, err := w.flattenStateTransition(flattenedStateTransitions[0:0], stateTransitions)
jobRunRows, err := w.createJobRunRow(stateTransitions)
if err != nil {
return err
}

for _, flattenedStateTransition := range flattenedStateTransitions {
for _, flattenedStateTransition := range jobRunRows {
if err := pw.Write(*flattenedStateTransition); err != nil {
return err
}
Expand Down

0 comments on commit 7f00534

Please sign in to comment.