diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 67e20acb0e9..bd79eefed28 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -419,6 +419,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger, } } + processedPVBs := itemBackupper.podVolumeBackupper.WaitAllPodVolumesProcessed(log) + backupRequest.PodVolumeBackups = append(backupRequest.PodVolumeBackups, processedPVBs...) + // do a final update on progress since we may have just added some CRDs and may not have updated // for the last few processed items. updated = backupRequest.Backup.DeepCopy() diff --git a/pkg/backup/item_backupper.go b/pkg/backup/item_backupper.go index 7f1d8268578..4b589fae318 100644 --- a/pkg/backup/item_backupper.go +++ b/pkg/backup/item_backupper.go @@ -270,7 +270,6 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti // even if there are errors. podVolumeBackups, podVolumePVCBackupSummary, errs := ib.backupPodVolumes(log, pod, pvbVolumes) - ib.backupRequest.PodVolumeBackups = append(ib.backupRequest.PodVolumeBackups, podVolumeBackups...) backupErrs = append(backupErrs, errs...) // Mark the volumes that has been processed by pod volume backup as Taken in the tracker. diff --git a/pkg/podvolume/backupper.go b/pkg/podvolume/backupper.go index c40a198d206..6929cfa64d7 100644 --- a/pkg/podvolume/backupper.go +++ b/pkg/podvolume/backupper.go @@ -45,17 +45,19 @@ import ( type Backupper interface { // BackupPodVolumes backs up all specified volumes in a pod. BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error) + WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup } type backupper struct { - ctx context.Context - repoLocker *repository.RepoLocker - repoEnsurer *repository.Ensurer - crClient ctrlclient.Client - uploaderType string - - results map[string]chan *velerov1api.PodVolumeBackup - resultsLock sync.Mutex + ctx context.Context + repoLocker *repository.RepoLocker + repoEnsurer *repository.Ensurer + crClient ctrlclient.Client + uploaderType string + pvbInformer ctrlcache.Informer + handlerRegistration cache.ResourceEventHandlerRegistration + wg sync.WaitGroup + result []*velerov1api.PodVolumeBackup } type skippedPVC struct { @@ -113,11 +115,12 @@ func newBackupper( repoEnsurer: repoEnsurer, crClient: crClient, uploaderType: uploaderType, - - results: make(map[string]chan *velerov1api.PodVolumeBackup), + pvbInformer: pvbInformer, + wg: sync.WaitGroup{}, + result: []*velerov1api.PodVolumeBackup{}, } - _, _ = pvbInformer.AddEventHandler( + b.handlerRegistration, _ = pvbInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { pvb := obj.(*velerov1api.PodVolumeBackup) @@ -126,17 +129,13 @@ func newBackupper( return } - if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { - b.resultsLock.Lock() - defer b.resultsLock.Unlock() - - resChan, ok := b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)] - if !ok { - log.Errorf("No results channel found for pod %s/%s to send pod volume backup %s/%s on", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Namespace, pvb.Name) - return - } - resChan <- pvb + if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted && + pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseFailed { + return } + + b.wg.Done() + b.result = append(b.result, pvb) }, }, ) @@ -217,12 +216,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. b.repoLocker.Lock(repo.Name) defer b.repoLocker.Unlock(repo.Name) - resultsChan := make(chan *velerov1api.PodVolumeBackup) - - b.resultsLock.Lock() - b.results[resultsKey(pod.Namespace, pod.Name)] = resultsChan - b.resultsLock.Unlock() - var ( podVolumeBackups []*velerov1api.PodVolumeBackup mountedPodVolumes = sets.Set[string]{} @@ -243,7 +236,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. repoIdentifier = repo.Spec.ResticIdentifier } - var numVolumeSnapshots int for _, volumeName := range volumesToBackup { volume, ok := podVolumes[volumeName] if !ok { @@ -305,32 +297,36 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. errs = append(errs, err) continue } + b.wg.Add(1) + podVolumeBackups = append(podVolumeBackups, volumeBackup) pvcSummary.addBackedup(volumeName) - numVolumeSnapshots++ } -ForEachVolume: - for i, count := 0, numVolumeSnapshots; i < count; i++ { - select { - case <-b.ctx.Done(): - errs = append(errs, errors.New("timed out waiting for all PodVolumeBackups to complete")) - break ForEachVolume - case res := <-resultsChan: - switch res.Status.Phase { - case velerov1api.PodVolumeBackupPhaseCompleted: - podVolumeBackups = append(podVolumeBackups, res) - case velerov1api.PodVolumeBackupPhaseFailed: - errs = append(errs, errors.Errorf("pod volume backup failed: %s", res.Status.Message)) - podVolumeBackups = append(podVolumeBackups, res) + return podVolumeBackups, pvcSummary, errs +} + +func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup { + defer b.pvbInformer.RemoveEventHandler(b.handlerRegistration) + + done := make(chan struct{}) + go func() { + defer close(done) + b.wg.Wait() + }() + + var podVolumeBackups []*velerov1api.PodVolumeBackup + select { + case <-b.ctx.Done(): + log.Error("timed out waiting for all PodVolumeBackups to complete") + case <-done: + for _, pvb := range b.result { + podVolumeBackups = append(podVolumeBackups, pvb) + if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { + log.Errorf("pod volume backup failed: %s", pvb.Status.Message) } } } - - b.resultsLock.Lock() - delete(b.results, resultsKey(pod.Namespace, pod.Name)) - b.resultsLock.Unlock() - - return podVolumeBackups, pvcSummary, errs + return podVolumeBackups } func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) {