Skip to content

Commit

Permalink
Merge branch 'master' into carlocamurri/lookoutv2-api-group-by-state-…
Browse files Browse the repository at this point in the history
…aggregates
  • Loading branch information
carlocamurri committed Jun 9, 2023
2 parents cc0f876 + 5e18b42 commit b4d1291
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 18 deletions.
16 changes: 15 additions & 1 deletion internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ type QueueSchedulingContext struct {

const maxPrintedJobIdsByReason = 1

// TODO: Update with preemptions.
func (qctx *QueueSchedulingContext) String() string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
Expand All @@ -289,8 +288,11 @@ func (qctx *QueueSchedulingContext) String() string {
fmt.Fprintf(w, "Total allocated resources after scheduling (by priority):\t%s\n", qctx.AllocatedByPriority.String())
fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriority.AggregateByResource().CompactString())
fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriority.String())
fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriority.AggregateByResource().CompactString())
fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriority.String())
fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", len(qctx.SuccessfulJobSchedulingContexts))
fmt.Fprintf(w, "Number of jobs that could not be scheduled:\t%d\n", len(qctx.UnsuccessfulJobSchedulingContexts))
fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", len(qctx.EvictedJobsById))
if len(qctx.SuccessfulJobSchedulingContexts) > 0 {
jobIdsToPrint := maps.Keys(qctx.SuccessfulJobSchedulingContexts)
if len(jobIdsToPrint) > maxPrintedJobIdsByReason {
Expand Down Expand Up @@ -326,6 +328,18 @@ func (qctx *QueueSchedulingContext) String() string {
}
}
}
if len(qctx.EvictedJobsById) > 0 {
jobIdsToPrint := maps.Keys(qctx.EvictedJobsById)
if len(jobIdsToPrint) > maxPrintedJobIdsByReason {
jobIdsToPrint = jobIdsToPrint[0:maxPrintedJobIdsByReason]
}
fmt.Fprintf(w, "Preempted jobs:\t%v", jobIdsToPrint)
if len(jobIdsToPrint) != len(qctx.EvictedJobsById) {
fmt.Fprintf(w, " (and %d others not shown)\n", len(qctx.EvictedJobsById)-len(jobIdsToPrint))
} else {
fmt.Fprint(w, "\n")
}
}
w.Flush()
return sb.String()
}
Expand Down
83 changes: 78 additions & 5 deletions internal/scheduler/reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,27 @@ type SchedulingContextRepository struct {
mostRecentSchedulingContextByExecutorP atomic.Pointer[SchedulingContextByExecutor]
// The most recent attempt where a non-zero amount of resources were scheduled.
mostRecentSuccessfulSchedulingContextByExecutorP atomic.Pointer[SchedulingContextByExecutor]
// The most recent attempt that preempted at least one job.
mostRecentPreemptingSchedulingContextByExecutorP atomic.Pointer[SchedulingContextByExecutor]

// Maps queue name to QueueSchedulingContextByExecutor.
// The most recent attempt.
mostRecentQueueSchedulingContextByExecutorByQueueP atomic.Pointer[map[string]QueueSchedulingContextByExecutor]
// The most recent attempt where a non-zero amount of resources were scheduled.
mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP atomic.Pointer[map[string]QueueSchedulingContextByExecutor]
// Map from queue name to most recent attempt that preempted at least one job belonging to this queue.
mostRecentPreemptingQueueSchedulingContextByExecutorByQueueP atomic.Pointer[map[string]QueueSchedulingContextByExecutor]

// Maps job id to JobSchedulingContextByExecutor.
// We limit the number of job contexts to store to control memory usage.
mostRecentJobSchedulingContextByExecutorByJobId *lru.Cache

// Store all executor ids seen so far in a set.
// Used to ensure all executors are included in reports.
executorIds map[string]bool
// All executors in sorted order.
sortedExecutorIdsP atomic.Pointer[[]string]

// Protects the fields in this struct from concurrent and dirty writes.
mu sync.Mutex
}
Expand All @@ -63,16 +71,27 @@ func NewSchedulingContextRepository(maxJobSchedulingContextsPerExecutor uint) (*
mostRecentJobSchedulingContextByExecutorByJobId: jobSchedulingContextByExecutorByJobId,
executorIds: make(map[string]bool),
}

mostRecentSchedulingContextByExecutor := make(SchedulingContextByExecutor)
mostRecentSuccessfulSchedulingContextByExecutor := make(SchedulingContextByExecutor)
mostRecentPreemptingSchedulingContextByExecutorP := make(SchedulingContextByExecutor)

mostRecentQueueSchedulingContextByExecutorByQueue := make(map[string]QueueSchedulingContextByExecutor)
mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue := make(map[string]QueueSchedulingContextByExecutor)
mostRecentPreemptingQueueSchedulingContextByExecutorByQueue := make(map[string]QueueSchedulingContextByExecutor)

sortedExecutorIds := make([]string, 0)

rv.mostRecentSchedulingContextByExecutorP.Store(&mostRecentSchedulingContextByExecutor)
rv.mostRecentSuccessfulSchedulingContextByExecutorP.Store(&mostRecentSuccessfulSchedulingContextByExecutor)
rv.mostRecentPreemptingSchedulingContextByExecutorP.Store(&mostRecentPreemptingSchedulingContextByExecutorP)

rv.mostRecentQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentQueueSchedulingContextByExecutorByQueue)
rv.mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue)
rv.mostRecentPreemptingQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentPreemptingQueueSchedulingContextByExecutorByQueue)

rv.sortedExecutorIdsP.Store(&sortedExecutorIds)

return rv, nil
}

Expand Down Expand Up @@ -121,23 +140,35 @@ func (repo *SchedulingContextRepository) addExecutorId(executorId string) error
func (repo *SchedulingContextRepository) addSchedulingContext(sctx *schedulercontext.SchedulingContext) error {
mostRecentSchedulingContextByExecutor := *repo.mostRecentSchedulingContextByExecutorP.Load()
mostRecentSchedulingContextByExecutor = maps.Clone(mostRecentSchedulingContextByExecutor)
mostRecentSchedulingContextByExecutor[sctx.ExecutorId] = sctx

mostRecentSuccessfulSchedulingContextByExecutor := *repo.mostRecentSuccessfulSchedulingContextByExecutorP.Load()
mostRecentSuccessfulSchedulingContextByExecutor = maps.Clone(mostRecentSuccessfulSchedulingContextByExecutor)
mostRecentSchedulingContextByExecutor[sctx.ExecutorId] = sctx
if !sctx.ScheduledResourcesByPriority.IsZero() {
mostRecentSuccessfulSchedulingContextByExecutor[sctx.ExecutorId] = sctx
}

mostRecentPreemptingContextByExecutor := *repo.mostRecentPreemptingSchedulingContextByExecutorP.Load()
mostRecentPreemptingContextByExecutor = maps.Clone(mostRecentPreemptingContextByExecutor)
if !sctx.EvictedResourcesByPriority.IsZero() {
mostRecentPreemptingContextByExecutor[sctx.ExecutorId] = sctx
}

repo.mostRecentSchedulingContextByExecutorP.Store(&mostRecentSchedulingContextByExecutor)
repo.mostRecentSuccessfulSchedulingContextByExecutorP.Store(&mostRecentSuccessfulSchedulingContextByExecutor)
repo.mostRecentPreemptingSchedulingContextByExecutorP.Store(&mostRecentPreemptingContextByExecutor)

return nil
}

// Should only be called from AddSchedulingContext to avoid dirty writes.
func (repo *SchedulingContextRepository) addQueueSchedulingContexts(qctxs []*schedulercontext.QueueSchedulingContext) error {
mostRecentQueueSchedulingContextByExecutorByQueue := *repo.mostRecentQueueSchedulingContextByExecutorByQueueP.Load()
mostRecentQueueSchedulingContextByExecutorByQueue = maps.Clone(mostRecentQueueSchedulingContextByExecutorByQueue)
mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue := *repo.mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP.Load()
mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue = maps.Clone(mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue)
mostRecentQueueSchedulingContextByExecutorByQueue := maps.Clone(*repo.mostRecentQueueSchedulingContextByExecutorByQueueP.Load())

mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue := maps.Clone(*repo.mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP.Load())

mostRecentPreemptingQueueSchedulingContextByExecutorByQueue := maps.Clone(*repo.mostRecentPreemptingQueueSchedulingContextByExecutorByQueueP.Load())

for _, qctx := range qctxs {
if qctx.ExecutorId == "" {
return errors.WithStack(&armadaerrors.ErrInvalidArgument{
Expand All @@ -153,6 +184,7 @@ func (repo *SchedulingContextRepository) addQueueSchedulingContexts(qctxs []*sch
Message: "received empty queue name",
})
}

if previous := mostRecentQueueSchedulingContextByExecutorByQueue[qctx.Queue]; previous != nil {
previous = maps.Clone(previous)
previous[qctx.ExecutorId] = qctx
Expand All @@ -162,6 +194,19 @@ func (repo *SchedulingContextRepository) addQueueSchedulingContexts(qctxs []*sch
qctx.ExecutorId: qctx,
}
}

if !qctx.EvictedResourcesByPriority.IsZero() {
if previous := mostRecentPreemptingQueueSchedulingContextByExecutorByQueue[qctx.Queue]; previous != nil {
previous = maps.Clone(previous)
previous[qctx.ExecutorId] = qctx
mostRecentPreemptingQueueSchedulingContextByExecutorByQueue[qctx.Queue] = previous
} else {
mostRecentPreemptingQueueSchedulingContextByExecutorByQueue[qctx.Queue] = QueueSchedulingContextByExecutor{
qctx.ExecutorId: qctx,
}
}
}

if !qctx.ScheduledResourcesByPriority.IsZero() {
if previous := mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue[qctx.Queue]; previous != nil {
previous = maps.Clone(previous)
Expand All @@ -174,8 +219,10 @@ func (repo *SchedulingContextRepository) addQueueSchedulingContexts(qctxs []*sch
}
}
}

repo.mostRecentQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentQueueSchedulingContextByExecutorByQueue)
repo.mostRecentSuccessfulQueueSchedulingContextByExecutorByQueueP.Store(&mostRecentSuccessfulQueueSchedulingContextByExecutorByQueue)

return nil
}

Expand Down Expand Up @@ -236,6 +283,7 @@ func (repo *SchedulingContextRepository) getSchedulingReportString() string {
sortedExecutorIds := repo.GetSortedExecutorIds()
mostRecentSchedulingContextByExecutor := repo.GetMostRecentSchedulingContextByExecutor()
mostRecentSuccessfulSchedulingContextByExecutor := repo.GetMostRecentSuccessfulSchedulingContextByExecutor()
mostRecentPreemptingSchedulingContextByExecutor := repo.GetMostRecentPreemptingSchedulingContextByExecutor()
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
for _, executorId := range sortedExecutorIds {
Expand All @@ -254,6 +302,13 @@ func (repo *SchedulingContextRepository) getSchedulingReportString() string {
} else {
fmt.Fprint(w, indent.String("\t", "Most recent successful attempt: none\n"))
}
sctx = mostRecentPreemptingSchedulingContextByExecutor[executorId]
if sctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt:\n"))
fmt.Fprint(w, indent.String("\t\t", sctx.String()))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt: none\n"))
}
}
w.Flush()
return sb.String()
Expand All @@ -274,6 +329,7 @@ func (repo *SchedulingContextRepository) getQueueReportString(queue string) stri
sortedExecutorIds := repo.GetSortedExecutorIds()
mostRecentQueueSchedulingContextByExecutor, _ := repo.GetMostRecentQueueSchedulingContextByExecutor(queue)
mostRecentSuccessfulQueueSchedulingContextByExecutor, _ := repo.GetMostRecentSuccessfulQueueSchedulingContextByExecutor(queue)
mostRecentPreemptingQueueSchedulingContextByExecutor, _ := repo.GetMostRecentPreemptingQueueSchedulingContextByExecutor(queue)
for _, executorId := range sortedExecutorIds {
fmt.Fprintf(w, "%s:\n", executorId)
qctx := mostRecentQueueSchedulingContextByExecutor[executorId]
Expand All @@ -290,6 +346,13 @@ func (repo *SchedulingContextRepository) getQueueReportString(queue string) stri
} else {
fmt.Fprint(w, indent.String("\t", "Most recent successful attempt: none\n"))
}
qctx = mostRecentPreemptingQueueSchedulingContextByExecutor[executorId]
if qctx != nil {
fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt:\n"))
fmt.Fprint(w, indent.String("\t\t", qctx.String()))
} else {
fmt.Fprint(w, indent.String("\t", "Most recent preempting attempt: none\n"))
}
}
w.Flush()
return sb.String()
Expand Down Expand Up @@ -337,6 +400,10 @@ func (repo *SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContex
return *repo.mostRecentSuccessfulSchedulingContextByExecutorP.Load()
}

func (repo *SchedulingContextRepository) GetMostRecentPreemptingSchedulingContextByExecutor() SchedulingContextByExecutor {
return *repo.mostRecentPreemptingSchedulingContextByExecutorP.Load()
}

func (repo *SchedulingContextRepository) GetMostRecentQueueSchedulingContextByExecutor(queue string) (QueueSchedulingContextByExecutor, bool) {
mostRecentQueueSchedulingContextByExecutorByQueue := *repo.mostRecentQueueSchedulingContextByExecutorByQueueP.Load()
mostRecentQueueSchedulingContextByExecutor, ok := mostRecentQueueSchedulingContextByExecutorByQueue[queue]
Expand All @@ -349,6 +416,12 @@ func (repo *SchedulingContextRepository) GetMostRecentSuccessfulQueueSchedulingC
return mostRecentSuccessfulQueueSchedulingContextByExecutor, ok
}

func (repo *SchedulingContextRepository) GetMostRecentPreemptingQueueSchedulingContextByExecutor(queue string) (QueueSchedulingContextByExecutor, bool) {
mostRecentPreemptingQueueSchedulingContextByExecutorByQueue := *repo.mostRecentPreemptingQueueSchedulingContextByExecutorByQueueP.Load()
mostRecentPreemptingQueueSchedulingContextByExecutor, ok := mostRecentPreemptingQueueSchedulingContextByExecutorByQueue[queue]
return mostRecentPreemptingQueueSchedulingContextByExecutor, ok
}

func (repo *SchedulingContextRepository) GetMostRecentJobSchedulingContextByExecutor(jobId string) (JobSchedulingContextByExecutor, bool) {
if v, ok := repo.mostRecentJobSchedulingContextByExecutorByJobId.Get(jobId); ok {
jobSchedulingContextByExecutor := v.(JobSchedulingContextByExecutor)
Expand Down
62 changes: 50 additions & 12 deletions internal/scheduler/reports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func TestAddGetSchedulingContext(t *testing.T) {
err = repo.AddSchedulingContext(sctx)
require.NoError(t, err)

sctx = testSchedulingContext("baz")
sctx = withPreemptingJobSchedulingContext(sctx, "C", "preempted")
err = repo.AddSchedulingContext(sctx)
require.NoError(t, err)

actualJobSchedulingContextByExecutor, ok := repo.GetMostRecentJobSchedulingContextByExecutor("doesNotExist")
require.Nil(t, actualJobSchedulingContextByExecutor)
require.False(t, ok)
Expand Down Expand Up @@ -139,12 +144,23 @@ func TestAddGetSchedulingContext(t *testing.T) {
actualQueueSchedulingContextByExecutor,
)

actualQueueSchedulingContextByExecutor, ok = repo.GetMostRecentQueueSchedulingContextByExecutor("C")
require.True(t, ok)
assert.Equal(
t,
QueueSchedulingContextByExecutor{
"baz": withPreemptingJobSchedulingContext(testSchedulingContext("baz"), "C", "preempted").QueueSchedulingContexts["C"],
},
actualQueueSchedulingContextByExecutor,
)

actualSchedulingContextByExecutor := repo.GetMostRecentSchedulingContextByExecutor()
assert.Equal(
t,
SchedulingContextByExecutor{
"foo": withUnsuccessfulJobSchedulingContext(testSchedulingContext("foo"), "A", "failureA"),
"bar": withUnsuccessfulJobSchedulingContext(testSchedulingContext("bar"), "B", "failureB"),
"baz": withPreemptingJobSchedulingContext(testSchedulingContext("baz"), "C", "preempted"),
},
actualSchedulingContextByExecutor,
)
Expand All @@ -158,6 +174,15 @@ func TestAddGetSchedulingContext(t *testing.T) {
},
actualSchedulingContextByExecutor,
)

actualSchedulingContextByExecutor = repo.GetMostRecentPreemptingSchedulingContextByExecutor()
assert.Equal(
t,
SchedulingContextByExecutor{
"baz": withPreemptingJobSchedulingContext(testSchedulingContext("baz"), "C", "preempted"),
},
actualSchedulingContextByExecutor,
)
}

// Concurrently write/read to/from the repo to test that there are no panics.
Expand All @@ -177,7 +202,9 @@ func TestTestAddGetSchedulingContextConcurrency(t *testing.T) {
sctx := testSchedulingContext(executorId)
sctx = withUnsuccessfulJobSchedulingContext(sctx, "A", "failureA")
sctx = withUnsuccessfulJobSchedulingContext(sctx, "B", "failureB")
sctx = withUnsuccessfulJobSchedulingContext(sctx, "C", "failureC")
sctx = withSuccessfulJobSchedulingContext(sctx, "B", fmt.Sprintf("success%sB", executorId))
sctx = withPreemptingJobSchedulingContext(sctx, "C", "preempted")
err = repo.AddSchedulingContext(sctx)
require.NoError(t, err)
err = repo.AddSchedulingContext(sctx)
Expand Down Expand Up @@ -217,18 +244,29 @@ func withSuccessfulJobSchedulingContext(sctx *schedulercontext.SchedulingContext
ExecutorId: sctx.ExecutorId,
JobId: jobId,
}
qctx.ScheduledResourcesByPriority.AddResourceList(
0,
schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")},
},
)
sctx.ScheduledResourcesByPriority.AddResourceList(
0,
schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")},
},
)
rl := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}
qctx.ScheduledResourcesByPriority.AddResourceList(0, rl)
sctx.ScheduledResourcesByPriority.AddResourceList(0, rl)
return sctx
}

func withPreemptingJobSchedulingContext(sctx *schedulercontext.SchedulingContext, queue, jobId string) *schedulercontext.SchedulingContext {
if sctx.QueueSchedulingContexts == nil {
sctx.QueueSchedulingContexts = make(map[string]*schedulercontext.QueueSchedulingContext)
}
qctx := sctx.QueueSchedulingContexts[queue]
if qctx == nil {
if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByPriorityAndResourceType)); err != nil {
panic(err)
}
qctx = sctx.QueueSchedulingContexts[queue]
qctx.SchedulingContext = nil
qctx.Created = time.Time{}
}
qctx.EvictedJobsById[jobId] = true
rl := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}
qctx.EvictedResourcesByPriority.AddResourceList(0, rl)
sctx.EvictedResourcesByPriority.AddResourceList(0, rl)
return sctx
}

Expand Down

0 comments on commit b4d1291

Please sign in to comment.