diff --git a/cmd/simulator/cmd/root.go b/cmd/simulator/cmd/root.go index 165b12f2876..578b04f9b2c 100644 --- a/cmd/simulator/cmd/root.go +++ b/cmd/simulator/cmd/root.go @@ -3,8 +3,10 @@ package cmd import ( "math" "os" + "runtime/pprof" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "golang.org/x/exp/maps" @@ -28,6 +30,7 @@ func RootCmd() *cobra.Command { cmd.Flags().Bool("showSchedulerLogs", false, "Show scheduler logs.") cmd.Flags().Int("logInterval", 0, "Log summary statistics every this many events. Disabled if 0.") cmd.Flags().String("eventsOutputFilePath", "", "Path of file to write events to.") + cmd.Flags().String("cycleStatsOutputFilePath", "", "Path of file to write cycle stats to.") cmd.Flags().Bool("enableFastForward", false, "Skips schedule events when we're in a steady state") cmd.Flags().Int("hardTerminationMinutes", math.MaxInt, "Limit the time simulated") cmd.Flags().Int("schedulerCyclePeriodSeconds", 10, "How often we should trigger schedule events") @@ -56,7 +59,11 @@ func runSimulations(cmd *cobra.Command, args []string) error { if err != nil { return err } - filePath, err := cmd.Flags().GetString("eventsOutputFilePath") + eventsOutputFilePath, err := cmd.Flags().GetString("eventsOutputFilePath") + if err != nil { + return err + } + cycleStatsOutputFilePath, err := cmd.Flags().GetString("cycleStatsOutputFilePath") if err != nil { return err } @@ -95,8 +102,8 @@ func runSimulations(cmd *cobra.Command, args []string) error { return err } } - if len(clusterSpecs)*len(workloadSpecs)*len(schedulingConfigsByFilePath) > 1 && filePath != "" { - return errors.Errorf("cannot save multiple simulations to file") + if len(clusterSpecs)*len(workloadSpecs)*len(schedulingConfigsByFilePath) > 1 && eventsOutputFilePath != "" { + return errors.Errorf("cannot save multiple simulations to eventsOutputFile") } ctx := armadacontext.Background() @@ -106,13 +113,25 @@ func runSimulations(cmd *cobra.Command, args []string) error { ctx.Infof("SchedulingConfigs: %v", maps.Keys(schedulingConfigsByFilePath)) var fileWriter *simulator.Writer - file, err := os.Create(filePath) + eventsOutputFile, err := os.Create(eventsOutputFilePath) + if err != nil { + return err + } + defer func() { + if err = eventsOutputFile.Close(); err != nil { + ctx.Errorf("failed to close eventsOutputFile: %s", err) + return + } + }() + + var statsWriter *simulator.StatsWriter + cycleStatsOutputFile, err := os.Create(cycleStatsOutputFilePath) if err != nil { return err } defer func() { - if err = file.Close(); err != nil { - ctx.Errorf("failed to close file: %s", err) + if err = cycleStatsOutputFile.Close(); err != nil { + ctx.Errorf("failed to close cycleStatsOutputFile: %s", err) return } }() @@ -138,19 +157,35 @@ func runSimulations(cmd *cobra.Command, args []string) error { mc.LogSummaryInterval = logInterval metricsCollectors = append(metricsCollectors, mc) - if filePath != "" { - fw, err := simulator.NewWriter(file, s.StateTransitions()) + if eventsOutputFilePath != "" { + fw, err := simulator.NewWriter(eventsOutputFile, s.StateTransitions()) if err != nil { return errors.WithStack(err) } fileWriter = fw } + if cycleStatsOutputFilePath != "" { + sw, err := simulator.NewStatsWriter(cycleStatsOutputFile, s.CycleMetrics()) + if err != nil { + return errors.WithStack(err) + } + statsWriter = sw + } stateTransitionChannels = append(stateTransitionChannels, s.StateTransitions()) schedulingConfigPaths = append(schedulingConfigPaths, schedulingConfigPath) } } } } + f, err := os.Create("profile") + if err != nil { + log.Fatal(err) + } + err = pprof.StartCPUProfile(f) + if err != nil { + log.Fatal(err) + } + defer pprof.StopCPUProfile() // Run simulators. g, ctx := armadacontext.ErrGroup(ctx) @@ -179,11 +214,16 @@ func runSimulations(cmd *cobra.Command, args []string) error { }) } - // Run file writer + // Run eventsOutputFile writer g.Go(func() error { return fileWriter.Run(ctx) }) + // Run stats writer + g.Go(func() error { + return statsWriter.Run(ctx) + }) + // Run metric collectors. for _, mc := range metricsCollectors { mc := mc diff --git a/internal/scheduler/simulator/cycle_stats.go b/internal/scheduler/simulator/cycle_stats.go new file mode 100644 index 00000000000..d0ef991bf21 --- /dev/null +++ b/internal/scheduler/simulator/cycle_stats.go @@ -0,0 +1,11 @@ +package simulator + +type CycleStats struct { + Time int64 `parquet:"name=time, type=INT64"` + Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + Pool string `parquet:"name=pool, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + PriorityClass string `parquet:"name=priority_class, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + Cpu float64 `parquet:"name=cpu, type=DOUBLE"` + Memory float64 `parquet:"name=memory, type=DOUBLE"` + Gpu float64 `parquet:"name=gpu, type=DOUBLE"` +} diff --git a/internal/scheduler/simulator/metrics.go b/internal/scheduler/simulator/metrics.go index 894c8a6a8d5..550b587ce66 100644 --- a/internal/scheduler/simulator/metrics.go +++ b/internal/scheduler/simulator/metrics.go @@ -19,6 +19,7 @@ type MetricsCollector struct { MetricsByQueue map[string]MetricsVector // If non-zero, log a summary every this many events. LogSummaryInterval int + BaseTime time.Time } type MetricsVector struct { @@ -38,6 +39,7 @@ func NewMetricsCollector(c <-chan StateTransition) *MetricsCollector { return &MetricsCollector{ c: c, MetricsByQueue: make(map[string]MetricsVector), + BaseTime: time.Unix(0, 0), } } @@ -94,7 +96,7 @@ func (mc *MetricsCollector) addEventSequence(eventSequence *armadaevents.EventSe perQueueMetrics := mc.MetricsByQueue[queue] perQueueMetrics.NumEvents += 1 for _, event := range eventSequence.Events { - d := protoutil.ToStdTime(event.Created).Sub(time.Time{}) + d := protoutil.ToStdTime(event.Created).Sub(mc.BaseTime) mc.OverallMetrics.TimeOfMostRecentEvent = d perQueueMetrics.TimeOfMostRecentEvent = d switch event.GetEvent().(type) { diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 9f8c2e52584..2da78d503bb 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -52,13 +52,12 @@ type Simulator struct { jobDb *jobdb.JobDb // Map from node id to the pool to which the node belongs. poolByNodeId map[string]string - // Separate nodeDb per pool and executorGroup. - nodeDbByPoolAndExecutorGroup map[string][]*nodedb.NodeDb - // Map from executor name to the nodeDb to which it belongs. - nodeDbByExecutorName map[string]*nodedb.NodeDb + // Separate nodeDb per pool + nodeDbByPool map[string]*nodedb.NodeDb // Allocation by pool for each queue and priority class. // Stored across invocations of the scheduler. allocationByPoolAndQueueAndPriorityClass map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string] + demandByQueue map[string]schedulerobjects.ResourceList // Total resources across all executorGroups for each pool. totalResourcesByPool map[string]schedulerobjects.ResourceList // Indicates whether a job has been submitted or terminated since the last scheduling round. @@ -72,6 +71,9 @@ type Simulator struct { // Simulated events are emitted on these event channels. // Create a channel by calling s.StateTransitions() before running the simulator. stateTransitionChannels []chan StateTransition + // Cycle Metrics are emitted on these channels. + // Create a channel by calling s.CycleMetrics() before running the simulator. + cycleMetricsChannels []chan CycleStats // Global job scheduling rate-limiter. limiter *rate.Limiter // Per-queue job scheduling rate-limiters. @@ -140,10 +142,10 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli jobTemplatesByDependencyIds: make(map[string]map[string]*JobTemplate), activeJobTemplatesById: make(map[string]*JobTemplate), jobDb: jobDb, - nodeDbByPoolAndExecutorGroup: make(map[string][]*nodedb.NodeDb), + nodeDbByPool: make(map[string]*nodedb.NodeDb), poolByNodeId: make(map[string]string), - nodeDbByExecutorName: make(map[string]*nodedb.NodeDb), allocationByPoolAndQueueAndPriorityClass: make(map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]), + demandByQueue: make(map[string]schedulerobjects.ResourceList), totalResourcesByPool: make(map[string]schedulerobjects.ResourceList), limiter: rate.NewLimiter( rate.Limit(schedulingConfig.MaximumSchedulingRate), @@ -155,6 +157,7 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli enableFastForward: enableFastForward, hardTerminationMinutes: hardTerminationMinutes, schedulerCyclePeriodSeconds: schedulerCyclePeriodSeconds, + time: time.Unix(0, 0).UTC(), } jobDb.SetClock(s) s.limiter.SetBurstAt(s.time, schedulingConfig.MaximumSchedulingBurst) @@ -177,16 +180,21 @@ func (s *Simulator) Since(t time.Time) time.Duration { // Run runs the scheduler until all jobs have finished successfully. func (s *Simulator) Run(ctx *armadacontext.Context) error { + startTime := time.Now() defer func() { for _, c := range s.stateTransitionChannels { close(c) } + for _, c := range s.cycleMetricsChannels { + close(c) + } + ctx.Infof("Finished in %s", time.Since(startTime)) }() // Bootstrap the simulator by pushing an event that triggers a scheduler run. s.pushScheduleEvent(s.time) simTerminationTime := s.time.Add(time.Minute * time.Duration(s.hardTerminationMinutes)) - + lastLogTime := s.time // Then run the scheduler until all jobs have completed. for s.eventLog.Len() > 0 { select { @@ -198,6 +206,10 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error { return err } } + if s.time.Unix()-lastLogTime.Unix() >= 60 { + ctx.Infof("Simulator time %s", s.time) + lastLogTime = s.time + } if s.time.After(simTerminationTime) { ctx.Infof("Current simulated time (%s) exceeds runtime deadline (%s). Terminating", s.time, simTerminationTime) return nil @@ -214,6 +226,14 @@ func (s *Simulator) StateTransitions() <-chan StateTransition { return c } +// CycleMetrics returns a channel on which end of cycle metrics are sent +// This function must be called before *Simulator.Run. +func (s *Simulator) CycleMetrics() <-chan CycleStats { + c := make(chan CycleStats, 1024) + s.cycleMetricsChannels = append(s.cycleMetricsChannels, c) + return c +} + func validateClusterSpec(clusterSpec *ClusterSpec) error { poolNames := armadaslices.Map(clusterSpec.Pools, func(pool *Pool) string { return pool.Name }) if !slices.Equal(poolNames, armadaslices.Unique(poolNames)) { @@ -256,21 +276,20 @@ func validateWorkloadSpec(workloadSpec *WorkloadSpec) error { func (s *Simulator) setupClusters() error { for _, pool := range s.ClusterSpec.Pools { totalResourcesForPool := schedulerobjects.ResourceList{} + nodeDb, err := nodedb.NewNodeDb( + s.schedulingConfig.PriorityClasses, + s.schedulingConfig.IndexedResources, + s.schedulingConfig.IndexedTaints, + s.schedulingConfig.IndexedNodeLabels, + s.schedulingConfig.WellKnownNodeTypes, + s.resourceListFactory, + ) + if err != nil { + return err + } for executorGroupIndex, executorGroup := range pool.ClusterGroups { - nodeDb, err := nodedb.NewNodeDb( - s.schedulingConfig.PriorityClasses, - s.schedulingConfig.IndexedResources, - s.schedulingConfig.IndexedTaints, - s.schedulingConfig.IndexedNodeLabels, - s.schedulingConfig.WellKnownNodeTypes, - s.resourceListFactory, - ) - if err != nil { - return err - } for executorIndex, executor := range executorGroup.Clusters { executorName := fmt.Sprintf("%s-%d-%d", pool.Name, executorGroupIndex, executorIndex) - s.nodeDbByExecutorName[executorName] = nodeDb for nodeTemplateIndex, nodeTemplate := range executor.NodeTemplates { for i := 0; i < int(nodeTemplate.Number); i++ { nodeId := fmt.Sprintf("%s-%d-%d-%d-%d", pool.Name, executorGroupIndex, executorIndex, nodeTemplateIndex, i) @@ -278,6 +297,7 @@ func (s *Simulator) setupClusters() error { Id: nodeId, Name: nodeId, Executor: executorName, + Pool: pool.Name, Taints: slices.Clone(nodeTemplate.Taints), Labels: maps.Clone(nodeTemplate.Labels), TotalResources: nodeTemplate.TotalResources.DeepCopy(), @@ -296,9 +316,9 @@ func (s *Simulator) setupClusters() error { } } } - s.nodeDbByPoolAndExecutorGroup[pool.Name] = append(s.nodeDbByPoolAndExecutorGroup[pool.Name], nodeDb) totalResourcesForPool.Add(nodeDb.TotalResources()) } + s.nodeDbByPool[pool.Name] = nodeDb s.totalResourcesByPool[pool.Name] = totalResourcesForPool } return nil @@ -451,166 +471,177 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { var eventSequences []*armadaevents.EventSequence txn := s.jobDb.WriteTxn() defer txn.Abort() - demandByQueue := calculateDemandByQueue(txn.GetAll()) for _, pool := range s.ClusterSpec.Pools { - for i := range pool.ClusterGroups { - nodeDb := s.nodeDbByPoolAndExecutorGroup[pool.Name][i] - if err := nodeDb.Reset(); err != nil { - return err - } - totalResources := s.totalResourcesByPool[pool.Name] - fairnessCostProvider, err := fairness.NewDominantResourceFairness( - totalResources, - s.schedulingConfig, - ) - if err != nil { - return err + nodeDb := s.nodeDbByPool[pool.Name] + if err := nodeDb.Reset(); err != nil { + return err + } + totalResources := s.totalResourcesByPool[pool.Name] + fairnessCostProvider, err := fairness.NewDominantResourceFairness( + totalResources, + s.schedulingConfig, + ) + if err != nil { + return err + } + sctx := schedulercontext.NewSchedulingContext( + pool.Name, + fairnessCostProvider, + s.limiter, + totalResources, + ) + + sctx.Started = s.time + for _, queue := range s.WorkloadSpec.Queues { + demand, hasDemand := s.demandByQueue[queue.Name] + if !hasDemand { + // To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running. + continue } - sctx := schedulercontext.NewSchedulingContext( - pool.Name, - fairnessCostProvider, - s.limiter, - totalResources, - ) - - sctx.Started = s.time - for _, queue := range s.WorkloadSpec.Queues { - demand, hasDemand := demandByQueue[queue.Name] - if !hasDemand { - // To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running. - continue - } - limiter, ok := s.limiterByQueue[queue.Name] - if !ok { - limiter = rate.NewLimiter( - rate.Limit(s.schedulingConfig.MaximumPerQueueSchedulingRate), - s.schedulingConfig.MaximumPerQueueSchedulingBurst, - ) - limiter.SetBurstAt(s.time, s.schedulingConfig.MaximumPerQueueSchedulingBurst) - s.limiterByQueue[queue.Name] = limiter - } - err := sctx.AddQueueSchedulingContext( - queue.Name, - queue.Weight, - s.allocationByPoolAndQueueAndPriorityClass[pool.Name][queue.Name], - demand, - demand, - limiter, + limiter, ok := s.limiterByQueue[queue.Name] + if !ok { + limiter = rate.NewLimiter( + rate.Limit(s.schedulingConfig.MaximumPerQueueSchedulingRate), + s.schedulingConfig.MaximumPerQueueSchedulingBurst, ) - if err != nil { - return err - } + limiter.SetBurstAt(s.time, s.schedulingConfig.MaximumPerQueueSchedulingBurst) + s.limiterByQueue[queue.Name] = limiter } - constraints := schedulerconstraints.NewSchedulingConstraints(pool.Name, totalResources, s.schedulingConfig, nil, map[string]bool{}) - - nloatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(s.schedulingConfig.ExperimentalFloatingResources) + err := sctx.AddQueueSchedulingContext( + queue.Name, + queue.Weight, + s.allocationByPoolAndQueueAndPriorityClass[pool.Name][queue.Name], + demand, + demand, + limiter, + ) if err != nil { return err } + } + constraints := schedulerconstraints.NewSchedulingConstraints(pool.Name, totalResources, s.schedulingConfig, nil, map[string]bool{}) - sch := scheduler.NewPreemptingQueueScheduler( - sctx, - constraints, - nloatingResourceTypes, - s.schedulingConfig.ProtectedFractionOfFairShare, - scheduler.NewSchedulerJobRepositoryAdapter(txn), - nodeDb, - // TODO: Necessary to support partial eviction. - nil, - nil, - nil, - ) + nloatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(s.schedulingConfig.ExperimentalFloatingResources) + if err != nil { + return err + } - schedulerCtx := ctx - if s.SuppressSchedulerLogs { - schedulerCtx = &armadacontext.Context{ - Context: ctx.Context, - FieldLogger: logging.NullLogger, - } + sch := scheduler.NewPreemptingQueueScheduler( + sctx, + constraints, + nloatingResourceTypes, + s.schedulingConfig.ProtectedFractionOfFairShare, + scheduler.NewSchedulerJobRepositoryAdapter(txn), + nodeDb, + // TODO: Necessary to support partial eviction. + nil, + nil, + nil, + ) + + schedulerCtx := ctx + if s.SuppressSchedulerLogs { + schedulerCtx = &armadacontext.Context{ + Context: ctx.Context, + FieldLogger: logging.NullLogger, } - result, err := sch.Schedule(schedulerCtx) - if err != nil { - return err + } + result, err := sch.Schedule(schedulerCtx) + for _, c := range result.SchedulingContexts { + for _, qCtx := range c.QueueSchedulingContexts { + for pc, rl := range qCtx.AllocatedByPriorityClass { + cpu := rl.Resources["cpu"] + memory := rl.Resources["memory"] + gpu := rl.Resources["nvidia.com/gpu"] + stats := CycleStats{ + Time: s.time.Unix(), + Queue: qCtx.Queue, + Pool: c.Pool, + PriorityClass: pc, + Cpu: (&cpu).AsApproximateFloat64(), + Memory: (&memory).AsApproximateFloat64(), + Gpu: (&gpu).AsApproximateFloat64(), + } + for _, channel := range s.cycleMetricsChannels { + channel <- stats + } + } } + } + if err != nil { + return err + } - // Update jobDb to reflect the decisions by the scheduler. - // Sort jobs to ensure deterministic event ordering. - preemptedJobs := schedulerresult.PreemptedJobsFromSchedulerResult(result) - scheduledJobs := slices.Clone(result.ScheduledJobs) - lessJob := func(a, b *jobdb.Job) int { - if a.Queue() < b.Queue() { - return -1 - } else if a.Queue() > b.Queue() { - return 1 - } - if a.Id() < b.Id() { - return -1 - } else if a.Id() > b.Id() { - return 1 - } - return 0 + // Update jobDb to reflect the decisions by the scheduler. + // Sort jobs to ensure deterministic event ordering. + preemptedJobs := schedulerresult.PreemptedJobsFromSchedulerResult(result) + scheduledJobs := slices.Clone(result.ScheduledJobs) + lessJob := func(a, b *jobdb.Job) int { + if a.Queue() < b.Queue() { + return -1 + } else if a.Queue() > b.Queue() { + return 1 } - slices.SortFunc(preemptedJobs, lessJob) - slices.SortFunc(scheduledJobs, func(a, b *schedulercontext.JobSchedulingContext) int { - return lessJob(a.Job, b.Job) - }) - for i, job := range preemptedJobs { - if run := job.LatestRun(); run != nil { - job = job.WithUpdatedRun(run.WithFailed(true)) - } else { - return errors.Errorf("attempting to preempt job %s with no associated runs", job.Id()) - } - preemptedJobs[i] = job.WithQueued(false).WithFailed(true) + if a.Id() < b.Id() { + return -1 + } else if a.Id() > b.Id() { + return 1 } - for i, jctx := range scheduledJobs { - job := jctx.Job - nodeId := result.NodeIdByJobId[job.Id()] - if nodeId == "" { - return errors.Errorf("job %s not mapped to a node", job.Id()) - } - if node, err := nodeDb.GetNode(nodeId); err != nil { - return err - } else { - priority, ok := nodeDb.GetScheduledAtPriority(job.Id()) - if !ok { - return errors.Errorf("job %s not mapped to a priority", job.Id()) - } - scheduledJobs[i].Job = job.WithQueued(false).WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), node.GetPool(), priority) - } + return 0 + } + slices.SortFunc(preemptedJobs, lessJob) + slices.SortFunc(scheduledJobs, func(a, b *schedulercontext.JobSchedulingContext) int { + return lessJob(a.Job, b.Job) + }) + for i, job := range preemptedJobs { + if run := job.LatestRun(); run != nil { + job = job.WithUpdatedRun(run.WithFailed(true)) + } else { + return errors.Errorf("attempting to preempt job %s with no associated runs", job.Id()) } - if err := txn.Upsert(preemptedJobs); err != nil { - return err + preemptedJobs[i] = job.WithQueued(false).WithFailed(true) + } + for i, jctx := range scheduledJobs { + job := jctx.Job + nodeId := result.NodeIdByJobId[job.Id()] + if nodeId == "" { + return errors.Errorf("job %s not mapped to a node", job.Id()) } - if err := txn.Upsert(armadaslices.Map(scheduledJobs, func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job })); err != nil { + if node, err := nodeDb.GetNode(nodeId); err != nil { return err + } else { + priority, ok := nodeDb.GetScheduledAtPriority(job.Id()) + if !ok { + return errors.Errorf("job %s not mapped to a priority", job.Id()) + } + scheduledJobs[i].Job = job.WithQueued(false).WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), node.GetPool(), priority) } + } + if err := txn.Upsert(preemptedJobs); err != nil { + return err + } + if err := txn.Upsert(armadaslices.Map(scheduledJobs, func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job })); err != nil { + return err + } - // Update allocation. - s.allocationByPoolAndQueueAndPriorityClass[pool.Name] = sctx.AllocatedByQueueAndPriority() - - // Generate eventSequences. - // TODO: Add time taken to run the scheduler to s.time. - eventSequences, err = scheduler.AppendEventSequencesFromPreemptedJobs(eventSequences, preemptedJobs, s.time) - if err != nil { - return err - } - eventSequences, err = scheduler.AppendEventSequencesFromScheduledJobs(eventSequences, scheduledJobs) - if err != nil { - return err - } + // Update allocation. + s.allocationByPoolAndQueueAndPriorityClass[pool.Name] = sctx.AllocatedByQueueAndPriority() - // Update event timestamps to be consistent with simulated time. - t := s.time - for _, eventSequence := range eventSequences { - for _, event := range eventSequence.Events { - event.Created = protoutil.ToTimestamp(t) - } - } + // Generate eventSequences. + eventSequences, err = scheduler.AppendEventSequencesFromPreemptedJobs(eventSequences, preemptedJobs, s.time) + if err != nil { + return err + } + eventSequences, err = scheduler.AppendEventSequencesFromScheduledJobs(eventSequences, scheduledJobs) + if err != nil { + return err + } - // If nothing changed, we're in steady state and can safely skip scheduling until something external has changed. - // Do this only if a non-zero amount of time has passed. - if !s.time.Equal(time.Time{}) && len(result.ScheduledJobs) == 0 && len(result.PreemptedJobs) == 0 { - s.shouldSchedule = false + // Update event timestamps to be consistent with simulated time. + t := s.time + for _, eventSequence := range eventSequences { + for _, event := range eventSequence.Events { + event.Created = protoutil.ToTimestamp(t) } } } @@ -624,7 +655,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { } // TODO: Write events to disk unless they should be discarded. -func (s *Simulator) handleEventSequence(ctx *armadacontext.Context, es *armadaevents.EventSequence) error { +func (s *Simulator) handleEventSequence(_ *armadacontext.Context, es *armadaevents.EventSequence) error { txn := s.jobDb.WriteTxn() defer txn.Abort() eventsToPublish := make([]*armadaevents.EventSequence_Event, 0, len(es.Events)) @@ -703,6 +734,7 @@ func (s *Simulator) handleSubmitJob(txn *jobdb.Txn, e *armadaevents.SubmitJob, t false, []string{}, ) + s.addJobToDemand(job) if err != nil { return nil, false, err } @@ -776,6 +808,7 @@ func (s *Simulator) handleJobSucceeded(txn *jobdb.Txn, e *armadaevents.JobSuccee job.PriorityClassName(), job.ResourceRequirements().Requests, ) + s.removeJobFromDemand(job) // Unbind the job from the node on which it was scheduled. if err := s.unbindRunningJob(job); err != nil { @@ -836,7 +869,7 @@ func (s *Simulator) unbindRunningJob(job *jobdb.Job) error { if run.NodeId() == "" { return errors.Errorf("empty nodeId for run %s of job %s", run.Id(), job.Id()) } - nodeDb := s.nodeDbByExecutorName[run.Executor()] + nodeDb := s.nodeDbByPool[run.Pool()] node, err := nodeDb.GetNode(run.NodeId()) if err != nil { return err @@ -856,7 +889,7 @@ func (s *Simulator) unbindRunningJob(job *jobdb.Job) error { func (s *Simulator) handleJobRunPreempted(txn *jobdb.Txn, e *armadaevents.JobRunPreempted) (*jobdb.Job, bool, error) { jobId := armadaevents.UlidFromProtoUuid(e.PreemptedJobId).String() job := txn.GetById(jobId) - + s.removeJobFromDemand(job) // Submit a retry for this job. jobTemplate := s.jobTemplateByJobId[job.Id()] retryJobId := util.ULID() @@ -890,19 +923,18 @@ func maxTime(a, b time.Time) time.Time { return a } -func calculateDemandByQueue(jobs []*jobdb.Job) map[string]schedulerobjects.ResourceList { - queueResources := make(map[string]schedulerobjects.ResourceList) +func (s *Simulator) addJobToDemand(job *jobdb.Job) { + r, ok := s.demandByQueue[job.Queue()] + if !ok { + r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) + s.demandByQueue[job.Queue()] = r + } + r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) +} - for _, job := range jobs { - if job.InTerminalState() { - continue - } - r, ok := queueResources[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - queueResources[job.Queue()] = r - } - r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) +func (s *Simulator) removeJobFromDemand(job *jobdb.Job) { + r, ok := s.demandByQueue[job.Queue()] + if ok { + r.SubV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) } - return queueResources } diff --git a/internal/scheduler/simulator/simulator_test.go b/internal/scheduler/simulator/simulator_test.go index 264757139e7..81278bb989d 100644 --- a/internal/scheduler/simulator/simulator_test.go +++ b/internal/scheduler/simulator/simulator_test.go @@ -164,59 +164,6 @@ func TestSimulator(t *testing.T) { }, simulatedTimeLimit: 5 * time.Minute, }, - "Preemption cascade": { - clusterSpec: &ClusterSpec{ - Name: "test", - Pools: []*Pool{ - WithExecutorGroupsPool( - &Pool{Name: "Pool"}, - ExecutorGroup32Cpu(1, 1), - ExecutorGroup32Cpu(1, 1), - ExecutorGroup32Cpu(1, 1), - ), - }, - }, - workloadSpec: &WorkloadSpec{ - Queues: []*Queue{ - WithJobTemplatesQueue( - &Queue{Name: "B", Weight: 1}, - JobTemplate32Cpu(1, "foo", testfixtures.PriorityClass0), - ), - WithJobTemplatesQueue( - &Queue{Name: "C", Weight: 1}, - JobTemplate32Cpu(2, "foo", testfixtures.PriorityClass0), - ), - WithJobTemplatesQueue( - &Queue{Name: "A", Weight: 1}, - WithMinSubmitTimeJobTemplate( - JobTemplate32Cpu(1, "foo", testfixtures.PriorityClass0), - 30*time.Second, - ), - ), - }, - }, - schedulingConfig: testfixtures.TestSchedulingConfig(), - expectedEventSequences: []*armadaevents.EventSequence{ - SubmitJob(1, "B", "foo"), - SubmitJob(2, "C", "foo"), - JobRunLeased(1, "B", "foo"), - JobRunLeased(1, "C", "foo"), - JobRunLeased(1, "C", "foo"), - SubmitJob(1, "A", "foo"), - JobRunPreempted(1, "B", "foo"), - JobRunLeased(1, "A", "foo"), - SubmitJob(1, "B", "foo"), - JobRunPreempted(1, "C", "foo"), - JobRunLeased(1, "B", "foo"), - SubmitJob(1, "C", "foo"), - JobSucceeded(1, "C", "foo"), - JobRunLeased(1, "C", "foo"), - JobSucceeded(1, "A", "foo"), - JobSucceeded(1, "B", "foo"), - JobSucceeded(1, "C", "foo"), - }, - simulatedTimeLimit: 5 * time.Minute, - }, "No preemption cascade with unified scheduling": { clusterSpec: &ClusterSpec{ Name: "test", diff --git a/internal/scheduler/simulator/stats_writer.go b/internal/scheduler/simulator/stats_writer.go new file mode 100644 index 00000000000..f7768911846 --- /dev/null +++ b/internal/scheduler/simulator/stats_writer.go @@ -0,0 +1,54 @@ +package simulator + +import ( + "io" + + parquetWriter "github.com/xitongsys/parquet-go/writer" + + "github.com/armadaproject/armada/internal/common/armadacontext" +) + +type StatsWriter struct { + c <-chan CycleStats + writer io.Writer +} + +func NewStatsWriter(writer io.Writer, c <-chan CycleStats) (*StatsWriter, error) { + sw := &StatsWriter{ + c: c, + writer: writer, + } + + return sw, nil +} + +func (w *StatsWriter) Run(ctx *armadacontext.Context) (err error) { + pw, err := parquetWriter.NewParquetWriterFromWriter(w.writer, new(CycleStats), 1) + if err != nil { + return + } + + defer func() { + if err != nil { + return + } + if err = pw.WriteStop(); err != nil { + ctx.Errorf("error closing parquet writer: %s", err.Error()) + return + } + }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case cycleStats, ok := <-w.c: + if !ok { + return + } + if err := pw.Write(cycleStats); err != nil { + return err + } + } + } +} diff --git a/internal/scheduler/simulator/test_utils.go b/internal/scheduler/simulator/test_utils.go index 18d8a9d17b0..f17267be91c 100644 --- a/internal/scheduler/simulator/test_utils.go +++ b/internal/scheduler/simulator/test_utils.go @@ -75,6 +75,24 @@ func GetBasicSchedulingConfig() configuration.SchedulingConfig { Resolution: resource.MustParse("1"), }, }, + SupportedResourceTypes: []configuration.ResourceType{ + { + Name: "memory", + Resolution: resource.MustParse("1"), + }, + { + Name: "cpu", + Resolution: resource.MustParse("1m"), + }, + { + Name: "ephemeral-storage", + Resolution: resource.MustParse("1"), + }, + { + Name: "nvidia.com/gpu", + Resolution: resource.MustParse("1"), + }, + }, MaximumSchedulingRate: math.Inf(1), MaximumSchedulingBurst: math.MaxInt, MaximumPerQueueSchedulingRate: math.Inf(1), diff --git a/internal/scheduler/simulator/testdata/configs/basicSchedulingConfig.yaml b/internal/scheduler/simulator/testdata/configs/basicSchedulingConfig.yaml index 16eecf71d89..97fd83145a3 100644 --- a/internal/scheduler/simulator/testdata/configs/basicSchedulingConfig.yaml +++ b/internal/scheduler/simulator/testdata/configs/basicSchedulingConfig.yaml @@ -2,7 +2,15 @@ maximumSchedulingRate: "+inf" maximumSchedulingBurst: 9223372036854775807 maximumPerQueueSchedulingRate: "+Inf" maximumPerQueueSchedulingBurst: 9223372036854775807 -fairnessModel: "DominantResourceFairness" +supportedResourceTypes: + - name: memory + resolution: "1" + - name: cpu + resolution: "1m" + - name: ephemeral-storage + resolution: "1" + - name: nvidia.com/gpu + resolution: "1" dominantResourceFairnessResourcesToConsider: - "cpu" - "memory" diff --git a/internal/scheduler/simulator/writer.go b/internal/scheduler/simulator/writer.go index c607af78849..6f4277e8c9c 100644 --- a/internal/scheduler/simulator/writer.go +++ b/internal/scheduler/simulator/writer.go @@ -2,9 +2,7 @@ package simulator import ( "io" - "time" - "github.com/pkg/errors" parquetWriter "github.com/xitongsys/parquet-go/writer" v1 "k8s.io/api/core/v1" @@ -13,127 +11,90 @@ import ( "github.com/armadaproject/armada/pkg/armadaevents" ) -const ( - unsupportedEvent = iota - submitJob = iota - jobRunLeased = iota - jobRunRunning = iota - jobSucceeded = iota - jobRunPreempted = iota - jobCancelled = iota -) - type Writer struct { - c <-chan StateTransition - writer io.Writer - prevSeenEventByJobId map[string]*armadaevents.EventSequence_Event + c <-chan StateTransition + writer io.Writer } -type FlattenedArmadaEvent struct { - Time int64 `parquet:"name=elapsed_time, type=INT64"` - Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - JobSet string `parquet:"name=job_set, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - JobId string `parquet:"name=job_id, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - RunIndex int `parquet:"name=run_index, type=INT32"` - NumRuns int `parquet:"name=num_runs, type=INT32"` - PriorityClass string `parquet:"name=priority_class, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - PreviousEventType int `parquet:"name=previous_event_type, type=INT32"` - EventType int `parquet:"name=event_type, type=INT32"` - SecondsSinceLastEvent float64 `parquet:"name=seconds_since_last_event, type=DOUBLE"` - Cpu float64 `parquet:"name=cpu, type=DOUBLE"` - Memory float64 `parquet:"name=memory, type=DOUBLE"` - Gpu float64 `parquet:"name=gpu, type=DOUBLE"` - EphemeralStorage float64 `parquet:"name=ephemeral_storage, type=DOUBLE"` - ExitCode int `parquet:"name=exit_code, type=INT32"` +type JobRunRow struct { + Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + JobSet string `parquet:"name=job_set, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + JobId string `parquet:"name=job_id, type=BYTE_ARRAY, convertedtype=UTF8"` + RunId string `parquet:"name=run_id, type=BYTE_ARRAY, convertedtype=UTF8"` + PriorityClass string `parquet:"name=priority_class, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + Cpu float64 `parquet:"name=cpu, type=DOUBLE"` + Memory float64 `parquet:"name=memory, type=DOUBLE"` + Gpu float64 `parquet:"name=gpu, type=DOUBLE"` + EphemeralStorage float64 `parquet:"name=ephemeral_storage, type=DOUBLE"` + ExitCode int `parquet:"name=exit_code, type=INT32"` + State string `parquet:"name=state, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + SubmittedTime int64 `parquet:"name=submitted_time, type=INT64"` + ScheduledTime int64 `parquet:"name=scheduled_time, type=INT64"` + FinishedTime int64 `parquet:"name=finished_time, type=INT64"` } func NewWriter(writer io.Writer, c <-chan StateTransition) (*Writer, error) { - if c == nil { - return nil, errors.Errorf("uninitialised channel passed into FileWriter") - } - fw := &Writer{ - c: c, - writer: writer, - prevSeenEventByJobId: make(map[string]*armadaevents.EventSequence_Event), + c: c, + writer: writer, } return fw, nil } // Presently only converts events supported in the simulator -func (w *Writer) encodeEvent(e *armadaevents.EventSequence_Event) int { +func (w *Writer) toEventState(e *armadaevents.EventSequence_Event) string { switch e.GetEvent().(type) { - case *armadaevents.EventSequence_Event_SubmitJob: - return submitJob - case *armadaevents.EventSequence_Event_JobRunLeased: - return jobRunLeased - case *armadaevents.EventSequence_Event_JobRunRunning: - return jobRunRunning case *armadaevents.EventSequence_Event_JobSucceeded: - return jobSucceeded + return "SUCCEEDED" case *armadaevents.EventSequence_Event_JobRunPreempted: - return jobRunPreempted + return "PREEMPTED" case *armadaevents.EventSequence_Event_CancelledJob: - return jobCancelled + return "CANCELLED" default: // Undefined event type - return unsupportedEvent + return "UNKNOWN" } } -func (w *Writer) flattenStateTransition(flattenedStateTransitions []*FlattenedArmadaEvent, st StateTransition) ([]*FlattenedArmadaEvent, error) { - startTime := time.Time{} - +func (w *Writer) createJobRunRow(st StateTransition) ([]*JobRunRow, error) { + rows := make([]*JobRunRow, 0, len(st.EventSequence.Events)) events := st.EventSequence jobsList := st.Jobs for i, event := range events.Events { // Assumes all supported events have an associated job associatedJob := jobsList[i] - prevSeenEvent := w.prevSeenEventByJobId[associatedJob.Id()] - // Resource requirements - cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] - memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] - ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] - gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] - - eventTime := protoutil.ToStdTime(event.Created) - prevEventType := 0 - prevEventTime := eventTime - if prevSeenEvent != nil { - prevEventType = w.encodeEvent(prevSeenEvent) - prevEventTime = protoutil.ToStdTime(prevSeenEvent.Created) - } - - flattenedStateTransitions = append(flattenedStateTransitions, &FlattenedArmadaEvent{ - Time: eventTime.Sub(startTime).Milliseconds(), - Queue: events.Queue, - JobSet: events.JobSetName, - JobId: associatedJob.Id(), - RunIndex: len(associatedJob.AllRuns()) - 1, // Assumed to be related to latest run in simulation - NumRuns: len(associatedJob.AllRuns()), - PriorityClass: associatedJob.PriorityClassName(), - PreviousEventType: prevEventType, - EventType: w.encodeEvent(event), - SecondsSinceLastEvent: eventTime.Sub(prevEventTime).Seconds(), - Cpu: cpuLimit.AsApproximateFloat64(), - Memory: memoryLimit.AsApproximateFloat64(), - Gpu: gpuLimit.AsApproximateFloat64(), - EphemeralStorage: ephemeralStorageLimit.AsApproximateFloat64(), - ExitCode: 0, - }) - w.prevSeenEventByJobId[associatedJob.Id()] = event - - if associatedJob.Succeeded() || associatedJob.Failed() || associatedJob.Cancelled() { - delete(w.prevSeenEventByJobId, associatedJob.Id()) + if event.GetCancelledJob() != nil || event.GetJobSucceeded() != nil || event.GetJobRunPreempted() != nil { + // Resource requirements + cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] + memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] + ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] + gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] + eventTime := protoutil.ToStdTime(event.Created) + + rows = append(rows, &JobRunRow{ + Queue: associatedJob.Queue(), + JobSet: associatedJob.Jobset(), + JobId: associatedJob.Id(), + RunId: associatedJob.LatestRun().Id().String(), + PriorityClass: associatedJob.PriorityClassName(), + Cpu: cpuLimit.AsApproximateFloat64(), + Memory: memoryLimit.AsApproximateFloat64(), + Gpu: gpuLimit.AsApproximateFloat64(), + EphemeralStorage: ephemeralStorageLimit.AsApproximateFloat64(), + ExitCode: 0, + State: w.toEventState(event), + SubmittedTime: associatedJob.SubmitTime().UnixNano() / 1000000000, + ScheduledTime: associatedJob.LatestRun().Created() / 1000000000, + FinishedTime: eventTime.UnixNano() / 1000000000, + }) } } - - return flattenedStateTransitions, nil + return rows, nil } func (w *Writer) Run(ctx *armadacontext.Context) (err error) { - pw, err := parquetWriter.NewParquetWriterFromWriter(w.writer, new(FlattenedArmadaEvent), 1) + pw, err := parquetWriter.NewParquetWriterFromWriter(w.writer, new(JobRunRow), 1) if err != nil { return } @@ -149,8 +110,6 @@ func (w *Writer) Run(ctx *armadacontext.Context) (err error) { } }() - flattenedStateTransitions := make([]*FlattenedArmadaEvent, 100) - for { select { case <-ctx.Done(): @@ -160,12 +119,12 @@ func (w *Writer) Run(ctx *armadacontext.Context) (err error) { return } - flattenedStateTransitions, err := w.flattenStateTransition(flattenedStateTransitions[0:0], stateTransitions) + jobRunRows, err := w.createJobRunRow(stateTransitions) if err != nil { return err } - for _, flattenedStateTransition := range flattenedStateTransitions { + for _, flattenedStateTransition := range jobRunRows { if err := pw.Write(*flattenedStateTransition); err != nil { return err }