Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Some improvements to the simulator #3895

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions cmd/simulator/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package cmd
import (
"math"
"os"
"runtime/pprof"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/exp/maps"

Expand All @@ -28,6 +30,7 @@ func RootCmd() *cobra.Command {
cmd.Flags().Bool("showSchedulerLogs", false, "Show scheduler logs.")
cmd.Flags().Int("logInterval", 0, "Log summary statistics every this many events. Disabled if 0.")
cmd.Flags().String("eventsOutputFilePath", "", "Path of file to write events to.")
cmd.Flags().String("cycleStatsOutputFilePath", "", "Path of file to write cycle stats to.")
cmd.Flags().Bool("enableFastForward", false, "Skips schedule events when we're in a steady state")
cmd.Flags().Int("hardTerminationMinutes", math.MaxInt, "Limit the time simulated")
cmd.Flags().Int("schedulerCyclePeriodSeconds", 10, "How often we should trigger schedule events")
Expand Down Expand Up @@ -56,7 +59,11 @@ func runSimulations(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
filePath, err := cmd.Flags().GetString("eventsOutputFilePath")
eventsOutputFilePath, err := cmd.Flags().GetString("eventsOutputFilePath")
if err != nil {
return err
}
cycleStatsOutputFilePath, err := cmd.Flags().GetString("cycleStatsOutputFilePath")
if err != nil {
return err
}
Expand Down Expand Up @@ -95,8 +102,8 @@ func runSimulations(cmd *cobra.Command, args []string) error {
return err
}
}
if len(clusterSpecs)*len(workloadSpecs)*len(schedulingConfigsByFilePath) > 1 && filePath != "" {
return errors.Errorf("cannot save multiple simulations to file")
if len(clusterSpecs)*len(workloadSpecs)*len(schedulingConfigsByFilePath) > 1 && eventsOutputFilePath != "" {
return errors.Errorf("cannot save multiple simulations to eventsOutputFile")
}

ctx := armadacontext.Background()
Expand All @@ -106,13 +113,25 @@ func runSimulations(cmd *cobra.Command, args []string) error {
ctx.Infof("SchedulingConfigs: %v", maps.Keys(schedulingConfigsByFilePath))

var fileWriter *simulator.Writer
file, err := os.Create(filePath)
eventsOutputFile, err := os.Create(eventsOutputFilePath)
if err != nil {
return err
}
defer func() {
if err = eventsOutputFile.Close(); err != nil {
ctx.Errorf("failed to close eventsOutputFile: %s", err)
return
}
}()

var statsWriter *simulator.StatsWriter
cycleStatsOutputFile, err := os.Create(cycleStatsOutputFilePath)
if err != nil {
return err
}
defer func() {
if err = file.Close(); err != nil {
ctx.Errorf("failed to close file: %s", err)
if err = cycleStatsOutputFile.Close(); err != nil {
ctx.Errorf("failed to close cycleStatsOutputFile: %s", err)
return
}
}()
Expand All @@ -138,19 +157,35 @@ func runSimulations(cmd *cobra.Command, args []string) error {
mc.LogSummaryInterval = logInterval
metricsCollectors = append(metricsCollectors, mc)

if filePath != "" {
fw, err := simulator.NewWriter(file, s.StateTransitions())
if eventsOutputFilePath != "" {
fw, err := simulator.NewWriter(eventsOutputFile, s.StateTransitions())
if err != nil {
return errors.WithStack(err)
}
fileWriter = fw
}
if cycleStatsOutputFilePath != "" {
sw, err := simulator.NewStatsWriter(cycleStatsOutputFile, s.CycleMetrics())
if err != nil {
return errors.WithStack(err)
}
statsWriter = sw
}
stateTransitionChannels = append(stateTransitionChannels, s.StateTransitions())
schedulingConfigPaths = append(schedulingConfigPaths, schedulingConfigPath)
}
}
}
}
f, err := os.Create("profile")
if err != nil {
log.Fatal(err)
}
err = pprof.StartCPUProfile(f)
if err != nil {
log.Fatal(err)
}
defer pprof.StopCPUProfile()

// Run simulators.
g, ctx := armadacontext.ErrGroup(ctx)
Expand Down Expand Up @@ -179,11 +214,16 @@ func runSimulations(cmd *cobra.Command, args []string) error {
})
}

// Run file writer
// Run eventsOutputFile writer
g.Go(func() error {
return fileWriter.Run(ctx)
})

// Run stats writer
g.Go(func() error {
return statsWriter.Run(ctx)
})

// Run metric collectors.
for _, mc := range metricsCollectors {
mc := mc
Expand Down
11 changes: 11 additions & 0 deletions internal/scheduler/simulator/cycle_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package simulator

type CycleStats struct {
Time int64 `parquet:"name=time, type=INT64"`
Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Pool string `parquet:"name=pool, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
PriorityClass string `parquet:"name=priority_class, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Cpu float64 `parquet:"name=cpu, type=DOUBLE"`
Memory float64 `parquet:"name=memory, type=DOUBLE"`
Gpu float64 `parquet:"name=gpu, type=DOUBLE"`
}
4 changes: 3 additions & 1 deletion internal/scheduler/simulator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type MetricsCollector struct {
MetricsByQueue map[string]MetricsVector
// If non-zero, log a summary every this many events.
LogSummaryInterval int
BaseTime time.Time
}

type MetricsVector struct {
Expand All @@ -38,6 +39,7 @@ func NewMetricsCollector(c <-chan StateTransition) *MetricsCollector {
return &MetricsCollector{
c: c,
MetricsByQueue: make(map[string]MetricsVector),
BaseTime: time.Unix(0, 0),
}
}

Expand Down Expand Up @@ -94,7 +96,7 @@ func (mc *MetricsCollector) addEventSequence(eventSequence *armadaevents.EventSe
perQueueMetrics := mc.MetricsByQueue[queue]
perQueueMetrics.NumEvents += 1
for _, event := range eventSequence.Events {
d := protoutil.ToStdTime(event.Created).Sub(time.Time{})
d := protoutil.ToStdTime(event.Created).Sub(mc.BaseTime)
mc.OverallMetrics.TimeOfMostRecentEvent = d
perQueueMetrics.TimeOfMostRecentEvent = d
switch event.GetEvent().(type) {
Expand Down
Loading