Skip to content

Commit

Permalink
Fix backup post hook issue
Browse files Browse the repository at this point in the history
Fix backup post hook issue

Fixes vmware-tanzu#8159

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
  • Loading branch information
ywk253100 committed Dec 12, 2024
1 parent cb7758f commit a06ded4
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 12 deletions.
10 changes: 7 additions & 3 deletions internal/hook/hook_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ type HookTracker struct {
// HookExecutedCnt indicates the number of executed hooks.
hookExecutedCnt int
// hookErrs records hook execution errors if any.
hookErrs []HookErrInfo
hookErrs []HookErrInfo
AsyncItemBlocks *sync.WaitGroup
}

// NewHookTracker creates a hookTracker instance.
func NewHookTracker() *HookTracker {
return &HookTracker{
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
AsyncItemBlocks: &sync.WaitGroup{},
}
}

Expand Down Expand Up @@ -141,6 +143,8 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName

// Stat returns the number of attempted hooks and failed hooks
func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) {
ht.AsyncItemBlocks.Wait()

ht.lock.RLock()
defer ht.lock.RUnlock()

Expand Down
113 changes: 104 additions & 9 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/internal/hook"
Expand Down Expand Up @@ -488,7 +491,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(*itemBlock)
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true
}
Expand Down Expand Up @@ -649,7 +652,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
}
}

func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock
// filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks
Expand All @@ -672,7 +675,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
}
}
}
postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre)
postHookPods, failedPods, errs := kb.handleItemBlockPreHooks(itemBlock, preHookPods)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod")
// if pre hook fails, flag pod as backed-up and move on
Expand All @@ -692,10 +695,9 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
}
}

itemBlock.Log.Debug("Executing post hooks")
_, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks")
kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
}

return grList
Expand All @@ -714,12 +716,12 @@ func (kb *kubernetesBackupper) itemMetadataAndKey(item itemblock.ItemBlockItem)
return metadata, key, nil
}

func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
var successPods []itemblock.ItemBlockItem
var failedPods []itemblock.ItemBlockItem
var errs []error
for _, pod := range hookPods {
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker)
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, hook.PhasePre, itemBlock.itemBackupper.hookTracker)
if err == nil {
successPods = append(successPods, pod)
} else {
Expand All @@ -730,6 +732,99 @@ func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, h
return successPods, failedPods, errs
}

func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log

// Handle post hooks for file system backup
// The hooks cannot execute until the PVBs to be processed
isFileSystemBackup := itemBlock.itemBackupper.backupRequest.Spec.DefaultVolumesToFsBackup
if isFileSystemBackup != nil && *isFileSystemBackup {
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
go func() {
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
}

for _, pod := range hookPods {
if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks,
hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil {
log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
}
}
}()
return
}

// not file system backup
for _, pod := range hookPods {
if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks,
hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil {
log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
}
}
}

func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}

podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}

pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for _, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue
}

processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
}
pvbMap[&pvb] = processed
}

checkFunc := func(context.Context) (done bool, err error) {
allProcessed := true
for pvb, processed := range pvbMap {
if processed {
continue
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue
}
if updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
pvbMap[pvb] = true
continue
}
allProcessed = false
}

return allProcessed, nil
}

return wait.PollUntilContextCancel(ctx, 5*time.Second, false, checkFunc)
}

func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
Expand Down

0 comments on commit a06ded4

Please sign in to comment.