Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove assertions from preempting_queue_scheduler.go #3959

Merged
merged 5 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 0 additions & 41 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,47 +401,6 @@ func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*internaltypes.
return obj.(*internaltypes.Node), nil
}

// NodeJobDiff compares two snapshots of the NodeDb memdb and returns
// - a map from job ids of all preempted jobs to the node they used to be on
// - a map from job ids of all scheduled jobs to the node they were scheduled on
// that happened between the two snapshots.
func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*internaltypes.Node, map[string]*internaltypes.Node, error) {
preempted := make(map[string]*internaltypes.Node)
scheduled := make(map[string]*internaltypes.Node)
nodePairIterator, err := NewNodePairIterator(txnA, txnB)
if err != nil {
return nil, nil, err
}
for item := nodePairIterator.NextItem(); item != nil; item = nodePairIterator.NextItem() {
if item.NodeA != nil && item.NodeB == nil {
// NodeA was removed. All jobs on NodeA are preempted.
for jobId := range item.NodeA.AllocatedByJobId {
preempted[jobId] = item.NodeA
}
} else if item.NodeA == nil && item.NodeB != nil {
// NodeB was added. All jobs on NodeB are scheduled.
for jobId := range item.NodeB.AllocatedByJobId {
scheduled[jobId] = item.NodeB
}
} else if item.NodeA != nil && item.NodeB != nil {
// NodeA is the same as NodeB.
// Jobs on NodeA that are not on NodeB are preempted.
// Jobs on NodeB that are not on NodeA are scheduled.
for jobId := range item.NodeA.AllocatedByJobId {
if _, ok := item.NodeB.AllocatedByJobId[jobId]; !ok {
preempted[jobId] = item.NodeA
}
}
for jobId := range item.NodeB.AllocatedByJobId {
if _, ok := item.NodeA.AllocatedByJobId[jobId]; !ok {
scheduled[jobId] = item.NodeB
}
}
}
}
return preempted, scheduled, nil
}

func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (bool, error) {
// Attempt to schedule pods one by one in a transaction.
for _, jctx := range gctx.JobSchedulingContexts {
Expand Down
78 changes: 0 additions & 78 deletions internal/scheduler/nodedb/nodeiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,84 +47,6 @@ func (it *NodesIterator) Next() interface{} {
return it.NextNode()
}

type NodePairIterator struct {
itA *NodesIterator
itB *NodesIterator
nodeA *internaltypes.Node
nodeB *internaltypes.Node
}

type NodePairIteratorItem struct {
NodeA *internaltypes.Node
NodeB *internaltypes.Node
}

func NewNodePairIterator(txnA, txnB *memdb.Txn) (*NodePairIterator, error) {
itA, err := NewNodesIterator(txnA)
if err != nil {
return nil, errors.WithStack(err)
}
itB, err := NewNodesIterator(txnB)
if err != nil {
return nil, errors.WithStack(err)
}
return &NodePairIterator{
itA: itA,
itB: itB,
}, nil
}

func (it *NodePairIterator) WatchCh() <-chan struct{} {
panic("not implemented")
}

func (it *NodePairIterator) NextItem() (rv *NodePairIteratorItem) {
defer func() {
if rv == nil {
return
}
if rv.NodeA != nil {
it.nodeA = nil
}
if rv.NodeB != nil {
it.nodeB = nil
}
}()
if it.nodeA == nil {
it.nodeA = it.itA.NextNode()
}
if it.nodeB == nil {
it.nodeB = it.itB.NextNode()
}
if it.nodeA == nil && it.nodeB == nil {
return nil
} else if it.nodeA == nil || it.nodeB == nil {
return &NodePairIteratorItem{
NodeA: it.nodeA,
NodeB: it.nodeB,
}
}
cmp := bytes.Compare([]byte(it.nodeA.GetId()), []byte(it.nodeB.GetId()))
if cmp == 0 {
return &NodePairIteratorItem{
NodeA: it.nodeA,
NodeB: it.nodeB,
}
} else if cmp == -1 {
return &NodePairIteratorItem{
NodeA: it.nodeA,
}
} else {
return &NodePairIteratorItem{
NodeB: it.nodeB,
}
}
}

func (it *NodePairIterator) Next() interface{} {
return it.NextItem()
}

// NodeIndex is an index for internaltypes.Node that returns node.NodeDbKeys[KeyIndex].
type NodeIndex struct {
KeyIndex int
Expand Down
49 changes: 0 additions & 49 deletions internal/scheduler/nodedb/nodeiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,55 +71,6 @@ func TestNodesIterator(t *testing.T) {
}
}

func TestNodePairIterator(t *testing.T) {
nodes := testfixtures.TestCluster()
for i, nodeId := range []string{"A", "B", "C"} {
nodes[i].Id = nodeId
}
nodeDb, err := newNodeDbWithNodes(nodes)
require.NoError(t, err)
entries := make([]*internaltypes.Node, len(nodes))
for i, node := range nodes {
entry, err := nodeDb.GetNode(node.Id)
require.NoError(t, err)
entries[i] = entry
}

txn := nodeDb.Txn(true)
require.NoError(t, txn.Delete("nodes", entries[2]))
txn.Commit()
txnA := nodeDb.Txn(false)

txn = nodeDb.Txn(true)
require.NoError(t, txn.Delete("nodes", entries[0]))
require.NoError(t, txn.Insert("nodes", entries[2]))
txn.Commit()
txnB := nodeDb.Txn(false)

it, err := NewNodePairIterator(txnA, txnB)
require.NoError(t, err)

actual := make([]*NodePairIteratorItem, 0)
for item := it.NextItem(); item != nil; item = it.NextItem() {
actual = append(actual, item)
}
expected := []*NodePairIteratorItem{
{
NodeA: entries[0],
NodeB: nil,
},
{
NodeA: entries[1],
NodeB: entries[1],
},
{
NodeA: nil,
NodeB: entries[2],
},
}
assert.Equal(t, expected, actual)
}

func TestNodeTypeIterator(t *testing.T) {
const nodeTypeALabel = "a"
const nodeTypeBLabel = "b"
Expand Down
95 changes: 0 additions & 95 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ type PreemptingQueueScheduler struct {
gangIdByJobId map[string]string
// If true, the unsuccessfulSchedulingKeys check of gangScheduler is omitted.
skipUnsuccessfulSchedulingKeyCheck bool
// If true, asserts that the nodeDb state is consistent with expected changes.
enableAssertions bool
}

func NewPreemptingQueueScheduler(
Expand Down Expand Up @@ -85,10 +83,6 @@ func NewPreemptingQueueScheduler(
}
}

func (sch *PreemptingQueueScheduler) EnableAssertions() {
sch.enableAssertions = true
}

func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.skipUnsuccessfulSchedulingKeyCheck = true
}
Expand All @@ -104,10 +98,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
preemptedJobsById := make(map[string]*schedulercontext.JobSchedulingContext)
scheduledJobsById := make(map[string]*schedulercontext.JobSchedulingContext)

// NodeDb snapshot prior to making any changes.
// We compare against this snapshot after scheduling to detect changes.
snapshot := sch.nodeDb.Txn(false)

// Evict preemptible jobs.
ctx.WithField("stage", "scheduling-algo").Infof("Evicting preemptible jobs")
evictorResult, inMemoryJobRepo, err := sch.evict(
Expand Down Expand Up @@ -240,19 +230,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
// TODO: Show failed jobs.

if sch.enableAssertions {
ctx.WithField("stage", "scheduling-algo").Infof("Performing assertions after scheduling round")
err := sch.assertions(
snapshot,
preemptedJobsById,
scheduledJobsById,
sch.nodeIdByJobId,
)
if err != nil {
return nil, err
}
ctx.WithField("stage", "scheduling-algo").Infof("Finished running assertions after scheduling round")
}
return &schedulerresult.SchedulerResult{
PreemptedJobs: preemptedJobs,
ScheduledJobs: scheduledJobs,
Expand Down Expand Up @@ -612,78 +589,6 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preempted []*scheduler
return nil
}

// For each node in the NodeDb, compare assigned jobs relative to the initial snapshot.
// Jobs no longer assigned to a node are preemtped.
// Jobs assigned to a node that weren't earlier are scheduled.
//
// Compare the nodedb.NodeJobDiff with expected preempted/scheduled jobs to ensure NodeDb is consistent.
// This is only to validate that nothing unexpected happened during scheduling.
func (sch *PreemptingQueueScheduler) assertions(
snapshot *memdb.Txn,
preemptedJobsById map[string]*schedulercontext.JobSchedulingContext,
scheduledJobsById map[string]*schedulercontext.JobSchedulingContext,
nodeIdByJobId map[string]string,
) error {
// Compare two snapshots of the nodeDb to find jobs that
// were preempted/scheduled between creating the snapshots.
preempted, scheduled, err := nodedb.NodeJobDiff(snapshot, sch.nodeDb.Txn(false))
if err != nil {
return err
}

// Assert that jobs we expect to be preempted/scheduled are marked as such in the nodeDb.
for jobId := range preemptedJobsById {
if _, ok := preempted[jobId]; !ok {
return errors.Errorf("inconsistent NodeDb: expected job %s to be preempted in nodeDb", jobId)
}
}
for jobId := range scheduledJobsById {
if _, ok := scheduled[jobId]; !ok {
return errors.Errorf("inconsistent NodeDb: expected job %s to be scheduled in nodeDb", jobId)
}
}

// Assert that jobs marked as preempted (scheduled) in the nodeDb are expected to be preempted (scheduled),
// and that jobs are preempted/scheduled on the nodes we expect them to.
for jobId, node := range preempted {
if expectedNodeId, ok := nodeIdByJobId[jobId]; ok {
if expectedNodeId != node.GetId() {
return errors.Errorf(
"inconsistent NodeDb: expected job %s to be preempted from node %s, but got %s",
jobId, expectedNodeId, node.GetId(),
)
}
} else {
return errors.Errorf(
"inconsistent NodeDb: expected job %s to be mapped to node %s, but found none",
jobId, node.GetId(),
)
}
if _, ok := preemptedJobsById[jobId]; !ok {
return errors.Errorf("inconsistent NodeDb: didn't expect job %s to be preempted (job marked as preempted in NodeDb)", jobId)
}
}
for jobId, node := range scheduled {
if expectedNodeId, ok := nodeIdByJobId[jobId]; ok {
if expectedNodeId != node.GetId() {
return errors.Errorf(
"inconsistent NodeDb: expected job %s to be on node %s, but got %s",
jobId, expectedNodeId, node.GetId(),
)
}
} else {
return errors.Errorf(
"inconsistent NodeDb: expected job %s to be mapped to node %s, but found none",
jobId, node.GetId(),
)
}
if _, ok := scheduledJobsById[jobId]; !ok {
return errors.Errorf("inconsistent NodeDb: didn't expect job %s to be scheduled (job marked as scheduled in NodeDb)", jobId)
}
}
return nil
}

type Evictor struct {
jobRepo JobRepository
nodeDb *nodedb.NodeDb
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,6 @@ func TestPreemptingQueueScheduler(t *testing.T) {
jobIdsByGangId,
gangIdByJobId,
)
sch.EnableAssertions()

result, err := sch.Schedule(ctx)
require.NoError(t, err)
Expand Down
3 changes: 0 additions & 3 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,6 @@ func (l *FairSchedulingAlgo) schedulePool(
if l.schedulingConfig.AlwaysAttemptScheduling {
scheduler.SkipUnsuccessfulSchedulingKeyCheck()
}
if l.schedulingConfig.EnableAssertions {
scheduler.EnableAssertions()
}

result, err := scheduler.Schedule(ctx)
if err != nil {
Expand Down