Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 committed Aug 22, 2024
1 parent a53adbb commit 61c7372
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
8 changes: 8 additions & 0 deletions cmd/simulator/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package cmd
import (
"math"
"os"
"runtime/pprof"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/exp/maps"

Expand Down Expand Up @@ -151,6 +153,12 @@ func runSimulations(cmd *cobra.Command, args []string) error {
}
}
}
f, err := os.Create("profile")
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()

// Run simulators.
g, ctx := armadacontext.ErrGroup(ctx)
Expand Down
36 changes: 20 additions & 16 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Simulator struct {
// Allocation by pool for each queue and priority class.
// Stored across invocations of the scheduler.
allocationByPoolAndQueueAndPriorityClass map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]
demandByQueue map[string]schedulerobjects.ResourceList
// Total resources across all executorGroups for each pool.
totalResourcesByPool map[string]schedulerobjects.ResourceList
// Indicates whether a job has been submitted or terminated since the last scheduling round.
Expand Down Expand Up @@ -140,6 +141,7 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
nodeDbByPool: make(map[string]*nodedb.NodeDb),
poolByNodeId: make(map[string]string),
allocationByPoolAndQueueAndPriorityClass: make(map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]),
demandByQueue: make(map[string]schedulerobjects.ResourceList),
totalResourcesByPool: make(map[string]schedulerobjects.ResourceList),
limiter: rate.NewLimiter(
rate.Limit(schedulingConfig.MaximumSchedulingRate),
Expand Down Expand Up @@ -174,10 +176,12 @@ func (s *Simulator) Since(t time.Time) time.Duration {

// Run runs the scheduler until all jobs have finished successfully.
func (s *Simulator) Run(ctx *armadacontext.Context) error {
startTime := time.Now()
defer func() {
for _, c := range s.stateTransitionChannels {
close(c)
}
ctx.Infof("Finished in %s", time.Since(startTime))
}()
// Bootstrap the simulator by pushing an event that triggers a scheduler run.
s.pushScheduleEvent(s.time)
Expand Down Expand Up @@ -452,7 +456,6 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
var eventSequences []*armadaevents.EventSequence
txn := s.jobDb.WriteTxn()
defer txn.Abort()
demandByQueue := calculateDemandByQueue(txn.GetAll())
for _, pool := range s.ClusterSpec.Pools {
nodeDb := s.nodeDbByPool[pool.Name]
if err := nodeDb.Reset(); err != nil {
Expand All @@ -475,7 +478,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {

sctx.Started = s.time
for _, queue := range s.WorkloadSpec.Queues {
demand, hasDemand := demandByQueue[queue.Name]
demand, hasDemand := s.demandByQueue[queue.Name]
if !hasDemand {
// To ensure fair share is computed only from active queues, i.e., queues with jobs queued or running.
continue
Expand Down Expand Up @@ -696,6 +699,7 @@ func (s *Simulator) handleSubmitJob(txn *jobdb.Txn, e *armadaevents.SubmitJob, t
false,
[]string{},
)
s.addJobToDemand(job)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -769,6 +773,7 @@ func (s *Simulator) handleJobSucceeded(txn *jobdb.Txn, e *armadaevents.JobSuccee
job.PriorityClassName(),
job.ResourceRequirements().Requests,
)
s.removeJobFromDemand(job)

// Unbind the job from the node on which it was scheduled.
if err := s.unbindRunningJob(job); err != nil {
Expand Down Expand Up @@ -849,7 +854,7 @@ func (s *Simulator) unbindRunningJob(job *jobdb.Job) error {
func (s *Simulator) handleJobRunPreempted(txn *jobdb.Txn, e *armadaevents.JobRunPreempted) (*jobdb.Job, bool, error) {
jobId := armadaevents.UlidFromProtoUuid(e.PreemptedJobId).String()
job := txn.GetById(jobId)

s.removeJobFromDemand(job)
// Submit a retry for this job.
jobTemplate := s.jobTemplateByJobId[job.Id()]
retryJobId := util.ULID()
Expand Down Expand Up @@ -883,19 +888,18 @@ func maxTime(a, b time.Time) time.Time {
return a
}

func calculateDemandByQueue(jobs []*jobdb.Job) map[string]schedulerobjects.ResourceList {
queueResources := make(map[string]schedulerobjects.ResourceList)
func (s *Simulator) addJobToDemand(job *jobdb.Job) {
r, ok := s.demandByQueue[job.Queue()]
if !ok {
r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests))
s.demandByQueue[job.Queue()] = r
}
r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests)
}

for _, job := range jobs {
if job.InTerminalState() {
continue
}
r, ok := queueResources[job.Queue()]
if !ok {
r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests))
queueResources[job.Queue()] = r
}
r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests)
func (s *Simulator) removeJobFromDemand(job *jobdb.Job) {
r, ok := s.demandByQueue[job.Queue()]
if ok {
r.SubV1ResourceList(job.PodRequirements().ResourceRequirements.Requests)
}
return queueResources
}

0 comments on commit 61c7372

Please sign in to comment.