Skip to content

Commit

Permalink
feat: Prune resources in reverse of sync wave order (#538)
Browse files Browse the repository at this point in the history
* Prune resources in reverse of sync wave order

Signed-off-by: Siddhesh Ghadi <sghadi1203@gmail.com>

* Use waveOverride var instead of directly patching live obj

Directly patching live objs results into incorrect wave ordering
as the new wave value from live obj is used to perform reordering during next sync

Signed-off-by: Siddhesh Ghadi <sghadi1203@gmail.com>

---------

Signed-off-by: Siddhesh Ghadi <sghadi1203@gmail.com>
  • Loading branch information
svghadi authored Jan 24, 2024
1 parent 7921242 commit 5fd9f44
Show file tree
Hide file tree
Showing 3 changed files with 352 additions and 13 deletions.
55 changes: 46 additions & 9 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -457,6 +456,18 @@ func (sc *syncContext) Sync() {
return
}

// if pruned tasks pending deletion, then wait...
prunedTasksPendingDelete := tasks.Filter(func(t *syncTask) bool {
if t.pruned() && t.liveObj != nil {
return t.liveObj.GetDeletionTimestamp() != nil
}
return false
})
if prunedTasksPendingDelete.Len() > 0 {
sc.setRunningPhase(prunedTasksPendingDelete, true)
return
}

// collect all completed hooks which have appropriate delete policy
hooksPendingDeletionSuccessful := tasks.Filter(func(task *syncTask) bool {
return task.isHook() && task.liveObj != nil && !task.running() && task.deleteOnPhaseSuccessful()
Expand Down Expand Up @@ -747,11 +758,42 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
}
}

// for pruneLast tasks, modify the wave to sync phase last wave of non prune task +1
// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
pruneTasks := make(map[int][]*syncTask)
for _, task := range tasks {
if task.isPrune() {
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
}
}

var uniquePruneWaves []int
for k := range pruneTasks {
uniquePruneWaves = append(uniquePruneWaves, k)
}
sort.Ints(uniquePruneWaves)

// reorder waves for pruning tasks using symmetric swap on prune waves
n := len(uniquePruneWaves)
for i := 0; i < n/2; i++ {
// waves to swap
startWave := uniquePruneWaves[i]
endWave := uniquePruneWaves[n-1-i]

for _, task := range pruneTasks[startWave] {
task.waveOverride = &endWave
}

for _, task := range pruneTasks[endWave] {
task.waveOverride = &startWave
}
}

// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
syncPhaseLastWave := 0
for _, task := range tasks {
if task.phase == common.SyncPhaseSync {
if task.wave() > syncPhaseLastWave && !task.isPrune() {
if task.wave() > syncPhaseLastWave {
syncPhaseLastWave = task.wave()
}
}
Expand All @@ -761,12 +803,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
for _, task := range tasks {
if task.isPrune() &&
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
annotations := task.liveObj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[common.AnnotationSyncWave] = strconv.Itoa(syncPhaseLastWave)
task.liveObj.SetAnnotations(annotations)
task.waveOverride = &syncPhaseLastWave
}
}

Expand Down
Loading

0 comments on commit 5fd9f44

Please sign in to comment.