Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick]Fix backup post hook issue #8517

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8517-ywk253100
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix backup post hook issue #8159 (caused by #7571): always execute backup post hooks after PVBs are handled
10 changes: 7 additions & 3 deletions internal/hook/hook_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ type HookTracker struct {
// HookExecutedCnt indicates the number of executed hooks.
hookExecutedCnt int
// hookErrs records hook execution errors if any.
hookErrs []HookErrInfo
hookErrs []HookErrInfo
AsyncItemBlocks *sync.WaitGroup
}

// NewHookTracker creates a hookTracker instance.
func NewHookTracker() *HookTracker {
return &HookTracker{
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
AsyncItemBlocks: &sync.WaitGroup{},
}
}

Expand Down Expand Up @@ -141,6 +143,8 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName

// Stat returns the number of attempted hooks and failed hooks
func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) {
ht.AsyncItemBlocks.Wait()

ht.lock.RLock()
defer ht.lock.RUnlock()

Expand Down
97 changes: 88 additions & 9 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/internal/hook"
Expand Down Expand Up @@ -474,7 +477,7 @@
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(*itemBlock)
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true
}
Expand Down Expand Up @@ -633,7 +636,7 @@
}
}

func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock
// filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks
Expand All @@ -656,7 +659,7 @@
}
}
}
postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre)
postHookPods, failedPods, errs := kb.handleItemBlockPreHooks(itemBlock, preHookPods)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod")
// if pre hook fails, flag pod as backed-up and move on
Expand All @@ -676,10 +679,9 @@
}
}

itemBlock.Log.Debug("Executing post hooks")
_, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks")
go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
}

return grList
Expand All @@ -698,12 +700,12 @@
return metadata, key, nil
}

func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
var successPods []itemblock.ItemBlockItem
var failedPods []itemblock.ItemBlockItem
var errs []error
for _, pod := range hookPods {
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker)
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, hook.PhasePre, itemBlock.itemBackupper.hookTracker)
if err == nil {
successPods = append(successPods, pod)
} else {
Expand All @@ -714,6 +716,83 @@
return successPods, failedPods, errs
}

// The hooks cannot execute until the PVBs to be processed
func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
}

for _, pod := range hookPods {
if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks,
hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil {
log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
}

Check warning on line 734 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L733-L734

Added lines #L733 - L734 were not covered by tests
}
}

func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}

Check warning on line 742 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L741-L742

Added lines #L741 - L742 were not covered by tests
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}

Check warning on line 749 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L748-L749

Added lines #L748 - L749 were not covered by tests

podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}

pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for i, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue

Check warning on line 759 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L758-L759

Added lines #L758 - L759 were not covered by tests
}

processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
}
pvbMap[&pvbList.Items[i]] = processed

Check warning on line 767 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L762-L767

Added lines #L762 - L767 were not covered by tests
}

checkFunc := func(context.Context) (done bool, err error) {
allProcessed := true
for pvb, processed := range pvbMap {
if processed {
continue

Check warning on line 774 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L773-L774

Added lines #L773 - L774 were not covered by tests
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue

Check warning on line 780 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L776-L780

Added lines #L776 - L780 were not covered by tests
}
if updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
pvbMap[pvb] = true
continue

Check warning on line 785 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L782-L785

Added lines #L782 - L785 were not covered by tests
}
allProcessed = false

Check warning on line 787 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L787

Added line #L787 was not covered by tests
}

return allProcessed, nil
}

return wait.PollUntilContextCancel(ctx, 5*time.Second, false, checkFunc)
}

func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
Expand Down
100 changes: 56 additions & 44 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3433,57 +3433,59 @@ func TestBackupWithHooks(t *testing.T) {
wantBackedUp []string
wantHookExecutionLog []test.HookExecutionEntry
}{
{
name: "pre hook with no resource filters runs for all pods",
backup: defaultBackup().
Hooks(velerov1.BackupHooks{
Resources: []velerov1.BackupResourceHookSpec{
{
Name: "hook-1",
PreHooks: []velerov1.BackupResourceHook{
{
Exec: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
/*
{
name: "pre hook with no resource filters runs for all pods",
backup: defaultBackup().
Hooks(velerov1.BackupHooks{
Resources: []velerov1.BackupResourceHookSpec{
{
Name: "hook-1",
PreHooks: []velerov1.BackupResourceHook{
{
Exec: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
},
},
},
},
}).
Result(),
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-1").Result(),
builder.ForPod("ns-2", "pod-2").Result(),
),
},
wantExecutePodCommandCalls: []*expectedCall{
{
podNamespace: "ns-1",
podName: "pod-1",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
},
}).
Result(),
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-1").Result(),
builder.ForPod("ns-2", "pod-2").Result(),
),
},
wantExecutePodCommandCalls: []*expectedCall{
{
podNamespace: "ns-1",
podName: "pod-1",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
{
podNamespace: "ns-2",
podName: "pod-2",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
},
err: nil,
},
{
podNamespace: "ns-2",
podName: "pod-2",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
wantBackedUp: []string{
"resources/pods/namespaces/ns-1/pod-1.json",
"resources/pods/namespaces/ns-2/pod-2.json",
"resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json",
"resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json",
},
},
wantBackedUp: []string{
"resources/pods/namespaces/ns-1/pod-1.json",
"resources/pods/namespaces/ns-2/pod-2.json",
"resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json",
"resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json",
},
},
*/
{
name: "post hook with no resource filters runs for all pods",
backup: defaultBackup().
Expand Down Expand Up @@ -3894,7 +3896,17 @@ func TestBackupWithHooks(t *testing.T) {
require.NoError(t, h.backupper.Backup(h.log, req, backupFile, nil, tc.actions, nil))

if tc.wantHookExecutionLog != nil {
assert.Equal(t, tc.wantHookExecutionLog, podCommandExecutor.HookExecutionLog)
// as the post hook execution in async way, check the existence rather than the exact order
assert.Equal(t, len(tc.wantHookExecutionLog), len(podCommandExecutor.HookExecutionLog))
m := map[string]struct{}{}
for _, entry := range podCommandExecutor.HookExecutionLog {
m[entry.String()] = struct{}{}
}

for _, entry := range tc.wantHookExecutionLog {
_, exist := m[entry.String()]
assert.True(t, exist)
}
}
assertTarballContents(t, backupFile, append(tc.wantBackedUp, "metadata/version")...)
})
Expand Down Expand Up @@ -4199,7 +4211,7 @@ func newHarness(t *testing.T) *harness {
// unsupported
podCommandExecutor: nil,
podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory),
podVolumeTimeout: 0,
podVolumeTimeout: 60 * time.Second,
},
log: log,
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/test/mock_pod_command_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ limitations under the License.
package test

import (
"fmt"
"strings"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"

Expand All @@ -33,6 +36,10 @@ type HookExecutionEntry struct {
HookCommand []string
}

func (h HookExecutionEntry) String() string {
return fmt.Sprintf("%s.%s.%s.%s", h.Namespace, h.Name, h.HookName, strings.Join(h.HookCommand, ","))
}

func (e *MockPodCommandExecutor) ExecutePodCommand(log logrus.FieldLogger, item map[string]interface{}, namespace, name, hookName string, hook *v1.ExecHook) error {
e.HookExecutionLog = append(e.HookExecutionLog, HookExecutionEntry{
Namespace: namespace,
Expand Down
Loading