Skip to content

Commit

Permalink
Simulator improvements (#167) (#3665)
Browse files Browse the repository at this point in the history
* Simulator improvements

* Adding log message when terminating early

Co-authored-by: Mustafa Ilyas <Mustafa.Ilyas@gresearch.co.uk>
  • Loading branch information
MustafaI and mustafai-gr authored Jun 10, 2024
1 parent f2d338d commit 2e11876
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 9 deletions.
19 changes: 18 additions & 1 deletion cmd/simulator/cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"math"
"os"

"github.com/pkg/errors"
Expand All @@ -27,6 +28,9 @@ func RootCmd() *cobra.Command {
cmd.Flags().Bool("showSchedulerLogs", false, "Show scheduler logs.")
cmd.Flags().Int("logInterval", 0, "Log summary statistics every this many events. Disabled if 0.")
cmd.Flags().String("eventsOutputFilePath", "", "Path of file to write events to.")
cmd.Flags().Bool("enableFastForward", false, "Skips schedule events when we're in a steady state")
cmd.Flags().Int("hardTerminationMinutes", math.MaxInt, "Limit the time simulated")
cmd.Flags().Int("schedulerCyclePeriodSeconds", 10, "How often we should trigger schedule events")
return cmd
}

Expand Down Expand Up @@ -57,6 +61,19 @@ func runSimulations(cmd *cobra.Command, args []string) error {
return err
}

enableFastForward, err := cmd.Flags().GetBool("enableFastForward")
if err != nil {
return err
}
hardTerminationMinutes, err := cmd.Flags().GetInt("hardTerminationMinutes")
if err != nil {
return err
}
schedulerCyclePeriodSeconds, err := cmd.Flags().GetInt("schedulerCyclePeriodSeconds")
if err != nil {
return err
}

// Load test specs. and config.
clusterSpecs, err := simulator.ClusterSpecsFromPattern(clusterPattern)
if err != nil {
Expand Down Expand Up @@ -108,7 +125,7 @@ func runSimulations(cmd *cobra.Command, args []string) error {
for _, clusterSpec := range clusterSpecs {
for _, workloadSpec := range workloadSpecs {
for schedulingConfigPath, schedulingConfig := range schedulingConfigsByFilePath {
if s, err := simulator.NewSimulator(clusterSpec, workloadSpec, schedulingConfig); err != nil {
if s, err := simulator.NewSimulator(clusterSpec, workloadSpec, schedulingConfig, enableFastForward, hardTerminationMinutes, schedulerCyclePeriodSeconds); err != nil {
return err
} else {
if !showSchedulerLogs {
Expand Down
30 changes: 23 additions & 7 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,20 @@ type Simulator struct {
SuppressSchedulerLogs bool
// For making internaltypes.ResourceList
resourceListFactory *internaltypes.ResourceListFactory
// Skips schedule events when we're in a steady state
enableFastForward bool
// Limit the time simulated
hardTerminationMinutes int
// Determines how often we trigger schedule events
schedulerCyclePeriodSeconds int
}

type StateTransition struct {
Jobs []*jobdb.Job
EventSequence *armadaevents.EventSequence
}

func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, schedulingConfig configuration.SchedulingConfig) (*Simulator, error) {
func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, schedulingConfig configuration.SchedulingConfig, enableFastForward bool, hardTerminationMinutes int, schedulerCyclePeriodSeconds int) (*Simulator, error) {
// TODO: Move clone to caller?
// Copy specs to avoid concurrent mutation.
resourceListFactory, err := internaltypes.MakeResourceListFactory(schedulingConfig.SupportedResourceTypes)
Expand All @@ -114,7 +120,7 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
)
randomSeed := workloadSpec.RandomSeed
if randomSeed == 0 {
// Seed the RNG using the local time if no explic random seed is provided.
// Seed the RNG using the local time if no explicit random seed is provided.
randomSeed = time.Now().Unix()
}
s := &Simulator{
Expand All @@ -134,9 +140,12 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
rate.Limit(schedulingConfig.MaximumSchedulingRate),
schedulingConfig.MaximumSchedulingBurst,
),
limiterByQueue: make(map[string]*rate.Limiter),
rand: rand.New(rand.NewSource(randomSeed)),
resourceListFactory: resourceListFactory,
limiterByQueue: make(map[string]*rate.Limiter),
rand: rand.New(rand.NewSource(randomSeed)),
resourceListFactory: resourceListFactory,
enableFastForward: enableFastForward,
hardTerminationMinutes: hardTerminationMinutes,
schedulerCyclePeriodSeconds: schedulerCyclePeriodSeconds,
}
jobDb.SetClock(s)
s.limiter.SetBurstAt(s.time, schedulingConfig.MaximumSchedulingBurst)
Expand Down Expand Up @@ -166,6 +175,9 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error {
}()
// Bootstrap the simulator by pushing an event that triggers a scheduler run.
s.pushScheduleEvent(s.time)

simTerminationTime := s.time.Add(time.Minute * time.Duration(s.hardTerminationMinutes))

// Then run the scheduler until all jobs have completed.
for s.eventLog.Len() > 0 {
select {
Expand All @@ -177,6 +189,10 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error {
return err
}
}
if s.time.After(simTerminationTime) {
ctx.Infof("Current simulated time (%s) exceeds runtime deadline (%s). Terminating", s.time, simTerminationTime)
return nil
}
}
return nil
}
Expand Down Expand Up @@ -418,9 +434,9 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
// Schedule the next run of the scheduler, unless there are no more active jobTemplates.
// TODO: Make timeout configurable.
if len(s.activeJobTemplatesById) > 0 {
s.pushScheduleEvent(s.time.Add(10 * time.Second))
s.pushScheduleEvent(s.time.Add(time.Duration(s.schedulerCyclePeriodSeconds) * time.Second))
}
if !s.shouldSchedule {
if !s.shouldSchedule && s.enableFastForward {
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/scheduler/simulator/simulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
)

func TestSimulator(t *testing.T) {
enableFastForward := false
schedulerCyclePeriodSeconds := 10
tests := map[string]struct {
clusterSpec *ClusterSpec
workloadSpec *WorkloadSpec
Expand Down Expand Up @@ -434,7 +436,7 @@ func TestSimulator(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
s, err := NewSimulator(tc.clusterSpec, tc.workloadSpec, tc.schedulingConfig)
s, err := NewSimulator(tc.clusterSpec, tc.workloadSpec, tc.schedulingConfig, enableFastForward, int((tc.simulatedTimeLimit + time.Hour).Minutes()), schedulerCyclePeriodSeconds)
require.NoError(t, err)
mc := NewMetricsCollector(s.StateTransitions())
actualEventSequences := make([]*armadaevents.EventSequence, 0, 128)
Expand Down

0 comments on commit 2e11876

Please sign in to comment.