Skip to content

Commit

Permalink
VRG: Handle Kube objects recovery point pointer update errors
Browse files Browse the repository at this point in the history
Signed-off-by: hatfieldbrian <bhatfiel@redhat.com>
  • Loading branch information
hatfieldbrian authored and raghavendra-talur committed Oct 23, 2023
1 parent f6afc6f commit 5776a50
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 29 deletions.
1 change: 0 additions & 1 deletion controllers/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,6 @@ func (v *VRGInstance) relocate(result *ctrl.Result, s3StoreAccessors []s3StoreAc
); clusterDataProtected != nil && (clusterDataProtected.Status != metav1.ConditionTrue ||
clusterDataProtected.ObservedGeneration != vrg.Generation) {
v.kubeObjectsProtectSecondary(result, s3StoreAccessors)
v.vrgObjectProtect(result, s3StoreAccessors)
}
}

Expand Down
64 changes: 49 additions & 15 deletions controllers/vrg_kubeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (v *VRGInstance) kubeObjectsCaptureStartOrResume(

v.kubeObjectsCaptureComplete(
result,
s3StoreAccessors,
captureStartConditionally,
captureNumber,
veleroNamespaceName,
Expand Down Expand Up @@ -360,40 +361,73 @@ func (v *VRGInstance) kubeObjectsCaptureDeleteAndLog(

func (v *VRGInstance) kubeObjectsCaptureComplete(
result *ctrl.Result,
s3StoreAccessors []s3StoreAccessor,
captureStartConditionally captureStartConditionally,
captureNumber int64, veleroNamespaceName string, interval time.Duration,
labels map[string]string, startTime metav1.Time, annotations map[string]string,
) {
vrg := v.instance
status := &vrg.Status.KubeObjectProtection
captureToRecoverFromIdentifier := &vrg.Status.KubeObjectProtection.CaptureToRecoverFrom

startGeneration, err := strconv.ParseInt(
annotations[vrgGenerationKey], vrgGenerationNumberBase, vrgGenerationNumberBitCount)
if err != nil {
v.log.Error(err, "Kube objects capture generation string to int64 conversion error")
}

captureToRecoverFromIdentifierCurrent := *captureToRecoverFromIdentifier
*captureToRecoverFromIdentifier = &ramen.KubeObjectsCaptureIdentifier{
Number: captureNumber,
StartTime: startTime,
StartGeneration: startGeneration,
}

v.vrgObjectProtectThrottled(
result,
s3StoreAccessors,
func() {
v.kubeObjectsCaptureIdentifierUpdateComplete(
result,
captureStartConditionally,
*captureToRecoverFromIdentifier,
veleroNamespaceName,
interval,
labels,
)
},
func() {
*captureToRecoverFromIdentifier = captureToRecoverFromIdentifierCurrent
},
)
}

func (v *VRGInstance) kubeObjectsCaptureIdentifierUpdateComplete(
result *ctrl.Result,
captureStartConditionally captureStartConditionally,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
veleroNamespaceName string,
interval time.Duration,
labels map[string]string,
) {
if err := v.reconciler.kubeObjects.ProtectRequestsDelete(
v.ctx, v.reconciler.Client, veleroNamespaceName, labels,
); err != nil {
v.log.Error(err, "Kube objects capture requests delete error", "number", captureNumber)
v.log.Error(err, "Kube objects capture requests delete error",
"number", captureToRecoverFromIdentifier.Number)
v.kubeObjectsCaptureFailed(err.Error())

result.Requeue = true

return
}

startGeneration, err := strconv.ParseInt(
annotations[vrgGenerationKey], vrgGenerationNumberBase, vrgGenerationNumberBitCount)
if err != nil {
v.log.Error(err, "Kube objects capture generation string to int64 conversion error")
}

v.kubeObjectsCaptureStatus(metav1.ConditionTrue, VRGConditionReasonUploaded, clusterDataProtectedTrueMessage)

status.CaptureToRecoverFrom = &ramen.KubeObjectsCaptureIdentifier{
Number: captureNumber, StartTime: startTime,
StartGeneration: startGeneration,
}
captureStartTimeSince := time.Since(startTime.Time)
v.log.Info("Kube objects captured", "recovery point", status.CaptureToRecoverFrom, "duration", captureStartTimeSince)
captureStartTimeSince := time.Since(captureToRecoverFromIdentifier.StartTime.Time)
v.log.Info("Kube objects captured", "recovery point", captureToRecoverFromIdentifier,
"duration", captureStartTimeSince)
captureStartConditionally(
v, result, startGeneration, captureStartTimeSince, interval,
v, result, captureToRecoverFromIdentifier.StartGeneration, captureStartTimeSince, interval,
func() {
v.log.Info("Kube objects capture schedule to run immediately")
delaySetMinimum(result)
Expand Down
30 changes: 17 additions & 13 deletions controllers/vrg_vrgobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,11 @@ import (

var vrgLastUploadTime = map[string]metav1.Time{}

//nolint:gomnd
func (v *VRGInstance) vrgObjectProtect(result *ctrl.Result, s3StoreAccessors []s3StoreAccessor) {
vrg := v.instance
eventReporter := v.reconciler.eventRecorder
log := v.log
key := vrg.Namespace + "/" + vrg.Name

if lastUploadTime, ok := vrgLastUploadTime[key]; ok {
kubeObjectCaptureInterval := ramen.KubeObjectProtectionCaptureIntervalDefault
if vrg.Spec.KubeObjectProtection != nil {
kubeObjectCaptureInterval = kubeObjectsCaptureInterval(vrg.Spec.KubeObjectProtection)
}

// The maxVRGProtectionInterval is half the interval of the kubeObjectsCaptureInterval
maxVRGProtectionInterval := kubeObjectCaptureInterval / 2
if lastUploadTime, ok := vrgLastUploadTime[v.namespacedName]; ok {
const maxVRGProtectionInterval = time.Minute

// Throttle VRG protection if this call is more recent than maxVRGProtectionInterval.
if shouldThrottleVRGProtection(lastUploadTime, maxVRGProtectionInterval) {
Expand All @@ -39,6 +29,16 @@ func (v *VRGInstance) vrgObjectProtect(result *ctrl.Result, s3StoreAccessors []s
}
}

v.vrgObjectProtectThrottled(result, s3StoreAccessors, func() {}, func() {})
}

func (v *VRGInstance) vrgObjectProtectThrottled(result *ctrl.Result, s3StoreAccessors []s3StoreAccessor,
success, failure func(),
) {
vrg := v.instance
eventReporter := v.reconciler.eventRecorder
log := v.log

for _, s3StoreAccessor := range s3StoreAccessors {
log1 := log.WithValues("profile", s3StoreAccessor.S3ProfileName)

Expand All @@ -54,15 +54,19 @@ func (v *VRGInstance) vrgObjectProtect(result *ctrl.Result, s3StoreAccessors []s
v.vrgObjectProtected = newVRGClusterDataUnprotectedCondition(vrg.Generation, message)
result.Requeue = true

failure()

return
}

log1.Info("VRG Kube object protected")

vrgLastUploadTime[key] = metav1.Now()
vrgLastUploadTime[v.namespacedName] = metav1.Now()

v.vrgObjectProtected = newVRGClusterDataProtectedCondition(vrg.Generation, clusterDataProtectedTrueMessage)
}

success()
}

func shouldThrottleVRGProtection(lastUploadTime metav1.Time, maxVRGProtectionTime time.Duration) bool {
Expand Down

0 comments on commit 5776a50

Please sign in to comment.