From 5fd9f449e75774f6b2b7b1ff137c7759e075e385 Mon Sep 17 00:00:00 2001 From: Siddhesh Ghadi <61187612+svghadi@users.noreply.github.com> Date: Wed, 24 Jan 2024 10:57:10 +0530 Subject: [PATCH] feat: Prune resources in reverse of sync wave order (#538) * Prune resources in reverse of sync wave order Signed-off-by: Siddhesh Ghadi * Use waveOverride var instead of directly patching live obj Directly patching live objs results into incorrect wave ordering as the new wave value from live obj is used to perform reordering during next sync Signed-off-by: Siddhesh Ghadi --------- Signed-off-by: Siddhesh Ghadi --- pkg/sync/sync_context.go | 55 +++++- pkg/sync/sync_context_test.go | 306 +++++++++++++++++++++++++++++++++- pkg/sync/sync_task.go | 4 + 3 files changed, 352 insertions(+), 13 deletions(-) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index c11ff7e0c..3b43333f4 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "sort" - "strconv" "strings" "sync" "time" @@ -457,6 +456,18 @@ func (sc *syncContext) Sync() { return } + // if pruned tasks pending deletion, then wait... + prunedTasksPendingDelete := tasks.Filter(func(t *syncTask) bool { + if t.pruned() && t.liveObj != nil { + return t.liveObj.GetDeletionTimestamp() != nil + } + return false + }) + if prunedTasksPendingDelete.Len() > 0 { + sc.setRunningPhase(prunedTasksPendingDelete, true) + return + } + // collect all completed hooks which have appropriate delete policy hooksPendingDeletionSuccessful := tasks.Filter(func(task *syncTask) bool { return task.isHook() && task.liveObj != nil && !task.running() && task.deleteOnPhaseSuccessful() @@ -747,11 +758,42 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } } - // for pruneLast tasks, modify the wave to sync phase last wave of non prune task +1 + // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) + pruneTasks := make(map[int][]*syncTask) + for _, task := range tasks { + if task.isPrune() { + pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) + } + } + + var uniquePruneWaves []int + for k := range pruneTasks { + uniquePruneWaves = append(uniquePruneWaves, k) + } + sort.Ints(uniquePruneWaves) + + // reorder waves for pruning tasks using symmetric swap on prune waves + n := len(uniquePruneWaves) + for i := 0; i < n/2; i++ { + // waves to swap + startWave := uniquePruneWaves[i] + endWave := uniquePruneWaves[n-1-i] + + for _, task := range pruneTasks[startWave] { + task.waveOverride = &endWave + } + + for _, task := range pruneTasks[endWave] { + task.waveOverride = &startWave + } + } + + // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1 + // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave syncPhaseLastWave := 0 for _, task := range tasks { if task.phase == common.SyncPhaseSync { - if task.wave() > syncPhaseLastWave && !task.isPrune() { + if task.wave() > syncPhaseLastWave { syncPhaseLastWave = task.wave() } } @@ -761,12 +803,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { for _, task := range tasks { if task.isPrune() && (sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) { - annotations := task.liveObj.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) - } - annotations[common.AnnotationSyncWave] = strconv.Itoa(syncPhaseLastWave) - task.liveObj.SetAnnotations(annotations) + task.waveOverride = &syncPhaseLastWave } } diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 0c2e1e14c..2e2740609 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" @@ -1583,8 +1584,8 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) - // last wave is the last sync wave for non-prune task + 1 - assert.Equal(t, 3, tasks.lastWave()) + // last wave is the last sync wave for tasks + 1 + assert.Equal(t, 8, tasks.lastWave()) }) t.Run("pruneLastIndividualResources", func(t *testing.T) { @@ -1601,8 +1602,8 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) - // last wave is the last sync wave for non-prune task + 1 - assert.Equal(t, 3, tasks.lastWave()) + // last wave is the last sync wave for tasks + 1 + assert.Equal(t, 8, tasks.lastWave()) }) } @@ -1686,3 +1687,300 @@ func TestSetOperationFailedNoTasks(t *testing.T) { assert.Equal(t, sc.message, "one or more objects failed to apply") } + +func TestWaveReorderingOfPruneTasks(t *testing.T) { + + ns := NewNamespace() + ns.SetName("ns") + pod1 := NewPod() + pod1.SetName("pod-1") + pod2 := NewPod() + pod2.SetName("pod-2") + pod3 := NewPod() + pod3.SetName("pod-3") + pod4 := NewPod() + pod4.SetName("pod-4") + pod5 := NewPod() + pod5.SetName("pod-5") + pod6 := NewPod() + pod6.SetName("pod-6") + pod7 := NewPod() + pod7.SetName("pod-7") + + type Test struct { + name string + target []*unstructured.Unstructured + live []*unstructured.Unstructured + expectedWaveOrder map[string]int + pruneLast bool + } + runTest := func(test Test) { + t.Run(test.name, func(t *testing.T) { + syncCtx := newTestSyncCtx(nil) + syncCtx.pruneLast = test.pruneLast + syncCtx.resources = groupResources(ReconciliationResult{ + Live: test.live, + Target: test.target, + }) + tasks, successful := syncCtx.getSyncTasks() + + assert.True(t, successful) + assert.Len(t, tasks, len(test.target)) + + for _, task := range tasks { + assert.Equal(t, test.expectedWaveOrder[task.name()], task.wave()) + } + }) + } + + // same wave + sameWaveTests := []Test{ + { + name: "sameWave_noPruneTasks", + live: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + target: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + }, + { + name: "sameWave_allPruneTasks", + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + }, + { + name: "sameWave_mixedTasks", + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + }, + } + + for _, test := range sameWaveTests { + runTest(test) + } + + // different wave + differentWaveTests := []Test{ + { + name: "differentWave_noPruneTasks", + target: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + live: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + ns.GetName(): 0, // 0 + pod1.GetName(): 1, // 1 + pod2.GetName(): 2, // 2 + pod3.GetName(): 3, // 3 + pod4.GetName(): 4, // 4 + }, + }, + { + name: "differentWave_allPruneTasks", + target: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + ns.GetName(): 4, // 0 + pod1.GetName(): 3, // 1 + pod2.GetName(): 2, // 2 + pod3.GetName(): 1, // 3 + pod4.GetName(): 0, // 4 + }, + }, + { + name: "differentWave_mixedTasks", + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil}, + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 4, // 1 + pod3.GetName(): 3, // 3 + pod4.GetName(): 1, // 4 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + pod2.GetName(): 2, // 2 + }, + }, + } + + for _, test := range differentWaveTests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2"}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"}) + + runTest(test) + } + + // prune last + pruneLastTests := []Test{ + { + name: "pruneLast", + pruneLast: true, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, nil, nil, nil}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 5, // 1 + pod2.GetName(): 5, // 2 + pod3.GetName(): 5, // 3 + pod4.GetName(): 5, // 4 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + }, + }, + { + name: "pruneLastIndividualResources", + pruneLast: false, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, nil, nil, nil}, + // change in wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 4, // 1 + pod2.GetName(): 5, // 2 + pod3.GetName(): 2, // 3 + pod4.GetName(): 1, // 4 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + }, + }, + } + + for _, test := range pruneLastTests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2", synccommon.AnnotationSyncOptions: synccommon.SyncOptionPruneLast}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"}) + + runTest(test) + } + + // additional test + tests := []Test{ + { + name: "mixedTasks", + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil, nil, pod6, nil}, + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4, pod5, pod6, pod7}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 5, // 1 + pod3.GetName(): 4, // 3 + pod4.GetName(): 4, // 3 + pod5.GetName(): 3, // 4 + pod7.GetName(): 1, // 5 + + // no change since non prune tasks + ns.GetName(): -1, // -1 + pod2.GetName(): 3, // 3 + pod6.GetName(): 5, // 5 + }, + }, + } + for _, test := range tests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "-1"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod5.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"}) + pod6.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5"}) + pod7.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5"}) + + runTest(test) + } + +} + +func TestWaitForCleanUpBeforeNextWave(t *testing.T) { + + pod1 := NewPod() + pod1.SetName("pod-1") + pod2 := NewPod() + pod2.SetName("pod-2") + pod3 := NewPod() + pod3.SetName("pod-3") + + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2"}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + + syncCtx := newTestSyncCtx(nil) + syncCtx.prune = true + + // prune order : pod3 -> pod2 -> pod1 + syncCtx.resources = groupResources(ReconciliationResult{ + Target: []*unstructured.Unstructured{nil, nil, nil}, + Live: []*unstructured.Unstructured{pod1, pod2, pod3}, + }) + + var phase common.OperationPhase + var msg string + var result []common.ResourceSyncResult + + // 1st sync should prune only pod3 + syncCtx.Sync() + phase, _, result = syncCtx.GetState() + assert.Equal(t, synccommon.OperationRunning, phase) + assert.Equal(t, 1, len(result)) + assert.Equal(t, "pod-3", result[0].ResourceKey.Name) + assert.Equal(t, synccommon.ResultCodePruned, result[0].Status) + + // simulate successful delete of pod3 + syncCtx.resources = groupResources(ReconciliationResult{ + Target: []*unstructured.Unstructured{nil, nil, }, + Live: []*unstructured.Unstructured{pod1, pod2, }, + }) + + // next sync should prune only pod2 + syncCtx.Sync() + phase, _, result = syncCtx.GetState() + assert.Equal(t, synccommon.OperationRunning, phase) + assert.Equal(t, 2, len(result)) + assert.Equal(t, "pod-2", result[1].ResourceKey.Name) + assert.Equal(t, synccommon.ResultCodePruned, result[1].Status) + + // add delete timestamp on pod2 to simulate pending delete + pod2.SetDeletionTimestamp(&metav1.Time{Time: time.Now()}) + + // next sync should wait for deletion of pod2 from cluster, + // it should not move to next wave and prune pod1 + syncCtx.Sync() + phase, msg, result = syncCtx.GetState() + assert.Equal(t, synccommon.OperationRunning, phase) + assert.Equal(t, "waiting for deletion of /Pod/pod-2", msg) + assert.Equal(t, 2, len(result)) + + // simulate successful delete of pod2 + syncCtx.resources = groupResources(ReconciliationResult{ + Target: []*unstructured.Unstructured{nil, }, + Live: []*unstructured.Unstructured{pod1, }, + }) + + // next sync should proceed with next wave + // i.e deletion of pod1 + syncCtx.Sync() + phase, _, result = syncCtx.GetState() + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Equal(t, 3, len(result)) + assert.Equal(t, "pod-3", result[0].ResourceKey.Name) + assert.Equal(t, "pod-2", result[1].ResourceKey.Name) + assert.Equal(t, "pod-1", result[2].ResourceKey.Name) + assert.Equal(t, synccommon.ResultCodePruned, result[0].Status) + assert.Equal(t, synccommon.ResultCodePruned, result[1].Status) + assert.Equal(t, synccommon.ResultCodePruned, result[2].Status) + +} diff --git a/pkg/sync/sync_task.go b/pkg/sync/sync_task.go index f6bebb56f..01c67a98b 100644 --- a/pkg/sync/sync_task.go +++ b/pkg/sync/sync_task.go @@ -107,6 +107,10 @@ func (t *syncTask) successful() bool { return t.operationState.Successful() } +func (t *syncTask) pruned() bool { + return t.syncStatus == common.ResultCodePruned +} + func (t *syncTask) hookType() common.HookType { if t.isHook() { return common.HookType(t.phase)