diff --git a/controllers/volumereplicationgroup_controller.go b/controllers/volumereplicationgroup_controller.go index c17bea73a..ffb1b6212 100644 --- a/controllers/volumereplicationgroup_controller.go +++ b/controllers/volumereplicationgroup_controller.go @@ -911,7 +911,6 @@ func (v *VRGInstance) relocate(result *ctrl.Result, s3StoreAccessors []s3StoreAc ); clusterDataProtected != nil && (clusterDataProtected.Status != metav1.ConditionTrue || clusterDataProtected.ObservedGeneration != vrg.Generation) { v.kubeObjectsProtectSecondary(result, s3StoreAccessors) - v.vrgObjectProtect(result, s3StoreAccessors) } } diff --git a/controllers/vrg_kubeobjects.go b/controllers/vrg_kubeobjects.go index df1d4b24c..177f6ca0c 100644 --- a/controllers/vrg_kubeobjects.go +++ b/controllers/vrg_kubeobjects.go @@ -273,6 +273,7 @@ func (v *VRGInstance) kubeObjectsCaptureStartOrResume( v.kubeObjectsCaptureComplete( result, + s3StoreAccessors, captureStartConditionally, captureNumber, veleroNamespaceName, @@ -360,17 +361,59 @@ func (v *VRGInstance) kubeObjectsCaptureDeleteAndLog( func (v *VRGInstance) kubeObjectsCaptureComplete( result *ctrl.Result, + s3StoreAccessors []s3StoreAccessor, captureStartConditionally captureStartConditionally, captureNumber int64, veleroNamespaceName string, interval time.Duration, labels map[string]string, startTime metav1.Time, annotations map[string]string, ) { vrg := v.instance - status := &vrg.Status.KubeObjectProtection + captureToRecoverFromIdentifier := &vrg.Status.KubeObjectProtection.CaptureToRecoverFrom + + startGeneration, err := strconv.ParseInt( + annotations[vrgGenerationKey], vrgGenerationNumberBase, vrgGenerationNumberBitCount) + if err != nil { + v.log.Error(err, "Kube objects capture generation string to int64 conversion error") + } + + captureToRecoverFromIdentifierCurrent := *captureToRecoverFromIdentifier + *captureToRecoverFromIdentifier = &ramen.KubeObjectsCaptureIdentifier{ + Number: captureNumber, + StartTime: startTime, + StartGeneration: startGeneration, + } + + v.vrgObjectProtectThrottled( + result, + s3StoreAccessors, + func() { + v.kubeObjectsCaptureIdentifierUpdateComplete( + result, + captureStartConditionally, + *captureToRecoverFromIdentifier, + veleroNamespaceName, + interval, + labels, + ) + }, + func() { + *captureToRecoverFromIdentifier = captureToRecoverFromIdentifierCurrent + }, + ) +} +func (v *VRGInstance) kubeObjectsCaptureIdentifierUpdateComplete( + result *ctrl.Result, + captureStartConditionally captureStartConditionally, + captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier, + veleroNamespaceName string, + interval time.Duration, + labels map[string]string, +) { if err := v.reconciler.kubeObjects.ProtectRequestsDelete( v.ctx, v.reconciler.Client, veleroNamespaceName, labels, ); err != nil { - v.log.Error(err, "Kube objects capture requests delete error", "number", captureNumber) + v.log.Error(err, "Kube objects capture requests delete error", + "number", captureToRecoverFromIdentifier.Number) v.kubeObjectsCaptureFailed(err.Error()) result.Requeue = true @@ -378,22 +421,13 @@ func (v *VRGInstance) kubeObjectsCaptureComplete( return } - startGeneration, err := strconv.ParseInt( - annotations[vrgGenerationKey], vrgGenerationNumberBase, vrgGenerationNumberBitCount) - if err != nil { - v.log.Error(err, "Kube objects capture generation string to int64 conversion error") - } - v.kubeObjectsCaptureStatus(metav1.ConditionTrue, VRGConditionReasonUploaded, clusterDataProtectedTrueMessage) - status.CaptureToRecoverFrom = &ramen.KubeObjectsCaptureIdentifier{ - Number: captureNumber, StartTime: startTime, - StartGeneration: startGeneration, - } - captureStartTimeSince := time.Since(startTime.Time) - v.log.Info("Kube objects captured", "recovery point", status.CaptureToRecoverFrom, "duration", captureStartTimeSince) + captureStartTimeSince := time.Since(captureToRecoverFromIdentifier.StartTime.Time) + v.log.Info("Kube objects captured", "recovery point", captureToRecoverFromIdentifier, + "duration", captureStartTimeSince) captureStartConditionally( - v, result, startGeneration, captureStartTimeSince, interval, + v, result, captureToRecoverFromIdentifier.StartGeneration, captureStartTimeSince, interval, func() { v.log.Info("Kube objects capture schedule to run immediately") delaySetMinimum(result) diff --git a/controllers/vrg_vrgobject.go b/controllers/vrg_vrgobject.go index d6261293f..6f3378f40 100644 --- a/controllers/vrg_vrgobject.go +++ b/controllers/vrg_vrgobject.go @@ -15,21 +15,11 @@ import ( var vrgLastUploadTime = map[string]metav1.Time{} -//nolint:gomnd func (v *VRGInstance) vrgObjectProtect(result *ctrl.Result, s3StoreAccessors []s3StoreAccessor) { - vrg := v.instance - eventReporter := v.reconciler.eventRecorder log := v.log - key := vrg.Namespace + "/" + vrg.Name - - if lastUploadTime, ok := vrgLastUploadTime[key]; ok { - kubeObjectCaptureInterval := ramen.KubeObjectProtectionCaptureIntervalDefault - if vrg.Spec.KubeObjectProtection != nil { - kubeObjectCaptureInterval = kubeObjectsCaptureInterval(vrg.Spec.KubeObjectProtection) - } - // The maxVRGProtectionInterval is half the interval of the kubeObjectsCaptureInterval - maxVRGProtectionInterval := kubeObjectCaptureInterval / 2 + if lastUploadTime, ok := vrgLastUploadTime[v.namespacedName]; ok { + const maxVRGProtectionInterval = time.Minute // Throttle VRG protection if this call is more recent than maxVRGProtectionInterval. if shouldThrottleVRGProtection(lastUploadTime, maxVRGProtectionInterval) { @@ -39,6 +29,16 @@ func (v *VRGInstance) vrgObjectProtect(result *ctrl.Result, s3StoreAccessors []s } } + v.vrgObjectProtectThrottled(result, s3StoreAccessors, func() {}, func() {}) +} + +func (v *VRGInstance) vrgObjectProtectThrottled(result *ctrl.Result, s3StoreAccessors []s3StoreAccessor, + success, failure func(), +) { + vrg := v.instance + eventReporter := v.reconciler.eventRecorder + log := v.log + for _, s3StoreAccessor := range s3StoreAccessors { log1 := log.WithValues("profile", s3StoreAccessor.S3ProfileName) @@ -54,15 +54,19 @@ func (v *VRGInstance) vrgObjectProtect(result *ctrl.Result, s3StoreAccessors []s v.vrgObjectProtected = newVRGClusterDataUnprotectedCondition(vrg.Generation, message) result.Requeue = true + failure() + return } log1.Info("VRG Kube object protected") - vrgLastUploadTime[key] = metav1.Now() + vrgLastUploadTime[v.namespacedName] = metav1.Now() v.vrgObjectProtected = newVRGClusterDataProtectedCondition(vrg.Generation, clusterDataProtectedTrueMessage) } + + success() } func shouldThrottleVRGProtection(lastUploadTime metav1.Time, maxVRGProtectionTime time.Duration) bool {