Skip to content

Commit

Permalink
pvb
Browse files Browse the repository at this point in the history
Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
  • Loading branch information
ywk253100 committed Mar 25, 2024
1 parent 24941b4 commit fb6d60d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 49 deletions.
3 changes: 3 additions & 0 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
}
}

processedPVBs := itemBackupper.podVolumeBackupper.WaitAllPodVolumesProcessed(log)
backupRequest.PodVolumeBackups = append(backupRequest.PodVolumeBackups, processedPVBs...)

// do a final update on progress since we may have just added some CRDs and may not have updated
// for the last few processed items.
updated = backupRequest.Backup.DeepCopy()
Expand Down
1 change: 0 additions & 1 deletion pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
// even if there are errors.
podVolumeBackups, podVolumePVCBackupSummary, errs := ib.backupPodVolumes(log, pod, pvbVolumes)

ib.backupRequest.PodVolumeBackups = append(ib.backupRequest.PodVolumeBackups, podVolumeBackups...)
backupErrs = append(backupErrs, errs...)

// Mark the volumes that has been processed by pod volume backup as Taken in the tracker.
Expand Down
92 changes: 44 additions & 48 deletions pkg/podvolume/backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,19 @@ import (
type Backupper interface {
// BackupPodVolumes backs up all specified volumes in a pod.
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error)
WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup
}

type backupper struct {
ctx context.Context
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
crClient ctrlclient.Client
uploaderType string

results map[string]chan *velerov1api.PodVolumeBackup
resultsLock sync.Mutex
ctx context.Context
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
crClient ctrlclient.Client
uploaderType string
pvbInformer ctrlcache.Informer
handlerRegistration cache.ResourceEventHandlerRegistration
wg sync.WaitGroup
result []*velerov1api.PodVolumeBackup
}

type skippedPVC struct {
Expand Down Expand Up @@ -113,11 +115,12 @@ func newBackupper(
repoEnsurer: repoEnsurer,
crClient: crClient,
uploaderType: uploaderType,

results: make(map[string]chan *velerov1api.PodVolumeBackup),
pvbInformer: pvbInformer,
wg: sync.WaitGroup{},
result: []*velerov1api.PodVolumeBackup{},
}

_, _ = pvbInformer.AddEventHandler(
b.handlerRegistration, _ = pvbInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
pvb := obj.(*velerov1api.PodVolumeBackup)
Expand All @@ -126,17 +129,13 @@ func newBackupper(
return
}

if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
b.resultsLock.Lock()
defer b.resultsLock.Unlock()

resChan, ok := b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)]
if !ok {
log.Errorf("No results channel found for pod %s/%s to send pod volume backup %s/%s on", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Namespace, pvb.Name)
return
}
resChan <- pvb
if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted &&
pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseFailed {
return
}

b.wg.Done()
b.result = append(b.result, pvb)
},
},
)
Expand Down Expand Up @@ -217,12 +216,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
b.repoLocker.Lock(repo.Name)
defer b.repoLocker.Unlock(repo.Name)

resultsChan := make(chan *velerov1api.PodVolumeBackup)

b.resultsLock.Lock()
b.results[resultsKey(pod.Namespace, pod.Name)] = resultsChan
b.resultsLock.Unlock()

var (
podVolumeBackups []*velerov1api.PodVolumeBackup
mountedPodVolumes = sets.Set[string]{}
Expand All @@ -243,7 +236,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
repoIdentifier = repo.Spec.ResticIdentifier
}

var numVolumeSnapshots int
for _, volumeName := range volumesToBackup {
volume, ok := podVolumes[volumeName]
if !ok {
Expand Down Expand Up @@ -305,32 +297,36 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
errs = append(errs, err)
continue
}
b.wg.Add(1)
podVolumeBackups = append(podVolumeBackups, volumeBackup)
pvcSummary.addBackedup(volumeName)
numVolumeSnapshots++
}

ForEachVolume:
for i, count := 0, numVolumeSnapshots; i < count; i++ {
select {
case <-b.ctx.Done():
errs = append(errs, errors.New("timed out waiting for all PodVolumeBackups to complete"))
break ForEachVolume
case res := <-resultsChan:
switch res.Status.Phase {
case velerov1api.PodVolumeBackupPhaseCompleted:
podVolumeBackups = append(podVolumeBackups, res)
case velerov1api.PodVolumeBackupPhaseFailed:
errs = append(errs, errors.Errorf("pod volume backup failed: %s", res.Status.Message))
podVolumeBackups = append(podVolumeBackups, res)
return podVolumeBackups, pvcSummary, errs
}

func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup {
defer b.pvbInformer.RemoveEventHandler(b.handlerRegistration)

done := make(chan struct{})
go func() {
defer close(done)
b.wg.Wait()
}()

var podVolumeBackups []*velerov1api.PodVolumeBackup
select {
case <-b.ctx.Done():
log.Error("timed out waiting for all PodVolumeBackups to complete")
case <-done:
for _, pvb := range b.result {
podVolumeBackups = append(podVolumeBackups, pvb)
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
log.Errorf("pod volume backup failed: %s", pvb.Status.Message)
}
}
}

b.resultsLock.Lock()
delete(b.results, resultsKey(pod.Namespace, pod.Name))
b.resultsLock.Unlock()

return podVolumeBackups, pvcSummary, errs
return podVolumeBackups
}

func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) {
Expand Down

0 comments on commit fb6d60d

Please sign in to comment.