From 6ed31bfcb9c7e4c47021dabbf86672d4a3cbfd0b Mon Sep 17 00:00:00 2001 From: wency <57141858+Chiwency@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:22:11 +0800 Subject: [PATCH] feat: support redis(standalone replica) pitr (#7998) --- .../v1alpha1/actionset_types.go | 6 ++ .../v1alpha1/zz_generated.deepcopy.go | 5 ++ ...taprotection.kubeblocks.io_actionsets.yaml | 4 ++ .../dataprotection/backup_controller.go | 56 ++++++++----------- .../dataprotection/restore_controller.go | 11 +++- controllers/dataprotection/utils.go | 39 ++++++++----- ...taprotection.kubeblocks.io_actionsets.yaml | 4 ++ docs/developer_docs/api-reference/backup.md | 12 ++++ pkg/dataprotection/backup/request.go | 2 +- pkg/dataprotection/backup/utils.go | 11 ++++ pkg/dataprotection/restore/manager.go | 6 ++ pkg/dataprotection/restore/utils.go | 4 ++ 12 files changed, 111 insertions(+), 49 deletions(-) diff --git a/apis/dataprotection/v1alpha1/actionset_types.go b/apis/dataprotection/v1alpha1/actionset_types.go index 8952f9cb35b..f693f38c715 100644 --- a/apis/dataprotection/v1alpha1/actionset_types.go +++ b/apis/dataprotection/v1alpha1/actionset_types.go @@ -156,6 +156,12 @@ type RestoreActionSpec struct { // // +optional PostReady []ActionSpec `json:"postReady,omitempty"` + + // Determines if a base backup is required during restoration. + // + // +optional + // +kubebuilder:default=true + BaseBackupRequired *bool `json:"baseBackupRequired,omitempty"` } // ActionSpec defines an action that should be executed. Only one of the fields may be set. diff --git a/apis/dataprotection/v1alpha1/zz_generated.deepcopy.go b/apis/dataprotection/v1alpha1/zz_generated.deepcopy.go index 1859a4923bb..e329cdb3bd8 100644 --- a/apis/dataprotection/v1alpha1/zz_generated.deepcopy.go +++ b/apis/dataprotection/v1alpha1/zz_generated.deepcopy.go @@ -1370,6 +1370,11 @@ func (in *RestoreActionSpec) DeepCopyInto(out *RestoreActionSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.BaseBackupRequired != nil { + in, out := &in.BaseBackupRequired, &out.BaseBackupRequired + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RestoreActionSpec. diff --git a/config/crd/bases/dataprotection.kubeblocks.io_actionsets.yaml b/config/crd/bases/dataprotection.kubeblocks.io_actionsets.yaml index 2f8dd197cc4..40a85d26a67 100644 --- a/config/crd/bases/dataprotection.kubeblocks.io_actionsets.yaml +++ b/config/crd/bases/dataprotection.kubeblocks.io_actionsets.yaml @@ -460,6 +460,10 @@ spec: restore: description: Specifies the restore action. properties: + baseBackupRequired: + default: true + description: Determines if a base backup is required during restoration. + type: boolean postReady: description: Specifies the actions that should be executed after the data has been prepared and is ready for restoration. diff --git a/controllers/dataprotection/backup_controller.go b/controllers/dataprotection/backup_controller.go index 29df6d75c06..b24fab8443d 100644 --- a/controllers/dataprotection/backup_controller.go +++ b/controllers/dataprotection/backup_controller.go @@ -34,9 +34,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" + "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -656,6 +658,14 @@ func (r *BackupReconciler) checkIsCompletedDuringRunning(reqCtx intctrlutil.Requ duration := backup.Status.CompletionTimestamp.Sub(backup.Status.StartTimestamp.Time).Round(time.Second) backup.Status.Duration = &metav1.Duration{Duration: duration} } + + for i := range backup.Status.Actions { + act := &backup.Status.Actions[i] + act.Phase = dpv1alpha1.ActionPhaseCompleted + act.AvailableReplicas = pointer.Int32(int32(0)) + act.CompletionTimestamp = backup.Status.CompletionTimestamp + } + return true, r.Client.Status().Patch(reqCtx.Ctx, backup, patch) } @@ -693,15 +703,6 @@ func (r *BackupReconciler) updateStatusIfFailed( return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "") } -// deleteExternalJobs deletes the external jobs. -func (r *BackupReconciler) deleteExternalJobs(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error { - labels := dpbackup.BuildBackupWorkloadLabels(backup) - if err := deleteRelatedJobs(reqCtx, r.Client, backup.Namespace, labels); err != nil { - return err - } - return deleteRelatedJobs(reqCtx, r.Client, viper.GetString(constant.CfgKeyCtrlrMgrNS), labels) -} - func (r *BackupReconciler) deleteVolumeSnapshots(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error { deleter := &dpbackup.Deleter{ @@ -711,34 +712,25 @@ func (r *BackupReconciler) deleteVolumeSnapshots(reqCtx intctrlutil.RequestCtx, return deleter.DeleteVolumeSnapshots(backup) } -// deleteExternalStatefulSet deletes the external statefulSet. -func (r *BackupReconciler) deleteExternalStatefulSet(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error { - key := client.ObjectKey{ - Namespace: backup.Namespace, - Name: backup.Name, - } - sts := &appsv1.StatefulSet{} - if err := r.Client.Get(reqCtx.Ctx, key, sts); err != nil { - return client.IgnoreNotFound(err) - } - - patch := client.MergeFrom(sts.DeepCopy()) - controllerutil.RemoveFinalizer(sts, dptypes.DataProtectionFinalizerName) - if err := r.Client.Patch(reqCtx.Ctx, sts, patch); err != nil { - return err - } - reqCtx.Log.V(1).Info("delete statefulSet", "statefulSet", sts) - return intctrlutil.BackgroundDeleteObject(r.Client, reqCtx.Ctx, sts) -} - // deleteExternalResources deletes the external workloads that execute backup. -// Currently, it only supports two types of workloads: job. +// Currently, it only supports two types of workloads: job, statefulSet func (r *BackupReconciler) deleteExternalResources( reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error { - if err := r.deleteExternalJobs(reqCtx, backup); err != nil { + labels := dpbackup.BuildBackupWorkloadLabels(backup) + + // use map to avoid duplicate deletion of the same namespace. + namespaces := map[string]sets.Empty{ + backup.Namespace: {}, + viper.GetString(constant.CfgKeyCtrlrMgrNS): {}, + } + + // delete the external jobs. + if err := deleteRelatedObjectList(reqCtx, r.Client, &batchv1.JobList{}, namespaces, labels); err != nil { return err } - return r.deleteExternalStatefulSet(reqCtx, backup) + + // delete the external statefulSets. + return deleteRelatedObjectList(reqCtx, r.Client, &appsv1.StatefulSetList{}, namespaces, labels) } // PatchBackupObjectMeta patches backup object metaObject include cluster snapshot. diff --git a/controllers/dataprotection/restore_controller.go b/controllers/dataprotection/restore_controller.go index dbd1ddb78cb..16842c989e5 100644 --- a/controllers/dataprotection/restore_controller.go +++ b/controllers/dataprotection/restore_controller.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -126,10 +127,14 @@ func (r *RestoreReconciler) parseRestoreJob(ctx context.Context, object client.O func (r *RestoreReconciler) deleteExternalResources(reqCtx intctrlutil.RequestCtx, restore *dpv1alpha1.Restore) error { labels := map[string]string{dprestore.DataProtectionRestoreLabelKey: restore.Name} - if err := deleteRelatedJobs(reqCtx, r.Client, restore.Namespace, labels); err != nil { - return err + + // use map to avoid duplicate deletion of the same namespace. + namespaces := map[string]sets.Empty{ + restore.Namespace: {}, + viper.GetString(constant.CfgKeyCtrlrMgrNS): {}, } - return deleteRelatedJobs(reqCtx, r.Client, viper.GetString(constant.CfgKeyCtrlrMgrNS), labels) + + return deleteRelatedObjectList(reqCtx, r.Client, &batchv1.JobList{}, namespaces, labels) } func CheckBackupRepoForRestore(reqCtx intctrlutil.RequestCtx, cli client.Client, restore *dpv1alpha1.Restore) (string, error) { diff --git a/controllers/dataprotection/utils.go b/controllers/dataprotection/utils.go index f1c0e5493ad..ab576c9b021 100644 --- a/controllers/dataprotection/utils.go +++ b/controllers/dataprotection/utils.go @@ -27,12 +27,14 @@ import ( "strings" "sync" + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -295,24 +297,35 @@ func getDefaultBackupRepo(ctx context.Context, cli client.Client) (*dpv1alpha1.B return defaultRepo, nil } -func deleteRelatedJobs(reqCtx intctrlutil.RequestCtx, cli client.Client, namespace string, labels map[string]string) error { - if labels == nil || namespace == "" { +type objectList interface { + *appsv1.StatefulSetList | *batchv1.JobList + client.ObjectList +} + +func deleteRelatedObjectList[T objectList](reqCtx intctrlutil.RequestCtx, cli client.Client, list T, namespaces map[string]sets.Empty, labels map[string]string) error { + if labels == nil || len(namespaces) == 0 { return nil } - jobs := &batchv1.JobList{} - if err := cli.List(reqCtx.Ctx, jobs, - client.MatchingLabels(labels)); err != nil { - return client.IgnoreNotFound(err) - } - for i := range jobs.Items { - job := &jobs.Items[i] - if err := dputils.RemoveDataProtectionFinalizer(reqCtx.Ctx, cli, job); err != nil { - return err + + for ns := range namespaces { + if err := cli.List(reqCtx.Ctx, list, client.InNamespace(ns), + client.MatchingLabels(labels)); err != nil { + return client.IgnoreNotFound(err) } - if err := intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, job); err != nil { - return err + objs := reflect.ValueOf(list).Elem().FieldByName("Items") + if !objs.IsZero() { + for i := 0; i < objs.Len(); i++ { + obj := objs.Index(i).Addr().Interface().(client.Object) + if err := dputils.RemoveDataProtectionFinalizer(reqCtx.Ctx, cli, obj); err != nil { + return err + } + if err := intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, obj); err != nil { + return err + } + } } } + return nil } diff --git a/deploy/helm/crds/dataprotection.kubeblocks.io_actionsets.yaml b/deploy/helm/crds/dataprotection.kubeblocks.io_actionsets.yaml index 2f8dd197cc4..40a85d26a67 100644 --- a/deploy/helm/crds/dataprotection.kubeblocks.io_actionsets.yaml +++ b/deploy/helm/crds/dataprotection.kubeblocks.io_actionsets.yaml @@ -460,6 +460,10 @@ spec: restore: description: Specifies the restore action. properties: + baseBackupRequired: + default: true + description: Determines if a base backup is required during restoration. + type: boolean postReady: description: Specifies the actions that should be executed after the data has been prepared and is ready for restoration. diff --git a/docs/developer_docs/api-reference/backup.md b/docs/developer_docs/api-reference/backup.md index 04f9d90127f..64337aa12bc 100644 --- a/docs/developer_docs/api-reference/backup.md +++ b/docs/developer_docs/api-reference/backup.md @@ -4370,6 +4370,18 @@ JobActionSpec

Specifies the actions that should be executed after the data has been prepared and is ready for restoration.

+ + +baseBackupRequired
+ +bool + + + +(Optional) +

Determines if a base backup is required during restoration.

+ +

RestoreActionStatus diff --git a/pkg/dataprotection/backup/request.go b/pkg/dataprotection/backup/request.go index a1444972e9c..47ed59fd25d 100644 --- a/pkg/dataprotection/backup/request.go +++ b/pkg/dataprotection/backup/request.go @@ -193,7 +193,7 @@ func (r *Request) buildBackupDataAction(targetPod *corev1.Pod, name string) (act Name: name, ObjectMeta: metav1.ObjectMeta{ Namespace: r.Namespace, - Name: r.Name, + Name: GenerateBackupStatefulSetName(r.Backup, r.Target.Name, BackupDataJobNamePrefix), Labels: BuildBackupWorkloadLabels(r.Backup), }, Replicas: pointer.Int32(int32(1)), diff --git a/pkg/dataprotection/backup/utils.go b/pkg/dataprotection/backup/utils.go index d17a097632c..bf61e24befa 100644 --- a/pkg/dataprotection/backup/utils.go +++ b/pkg/dataprotection/backup/utils.go @@ -156,6 +156,17 @@ func GenerateBackupJobName(backup *dpv1alpha1.Backup, prefix string) string { return name } +func GenerateBackupStatefulSetName(backup *dpv1alpha1.Backup, targetName, prefix string) string { + name := backup.Name + // for cluster mode with multiple targets, the statefulSet name should include the target name. + if targetName != "" { + name = fmt.Sprintf("%s-%s-%s", prefix, targetName, backup.Name) + } + // statefulSet name cannot exceed 52 characters for label name limit as the statefulset controller will + // add a 10-length suffix to the name to construct the label "controller-revision-hash": "-" + return strings.TrimSuffix(name[:min(len(name), 52)], "-") +} + func generateBaseCRNameByBackupSchedule(uniqueNameWithBackupSchedule, backupScheduleNS, method string) string { name := fmt.Sprintf("%s-%s", uniqueNameWithBackupSchedule, backupScheduleNS) if len(name) > 30 { diff --git a/pkg/dataprotection/restore/manager.go b/pkg/dataprotection/restore/manager.go index 035a69f8680..06544d15884 100644 --- a/pkg/dataprotection/restore/manager.go +++ b/pkg/dataprotection/restore/manager.go @@ -159,6 +159,12 @@ func (r *RestoreManager) BuildContinuousRestoreManager(reqCtx intctrlutil.Reques if err := checkRestoreTime(); err != nil { return err } + + if baseBackupRequired := continuousBackupSet.ActionSet.Spec.Restore.BaseBackupRequired; boolptr.IsSetToFalse(baseBackupRequired) { + r.SetBackupSets(continuousBackupSet) + return nil + } + fullBackupSet, err := r.getFullBackupActionSetForContinuous(reqCtx, cli, continuousBackup, metav1.NewTime(restoreTime)) if err != nil || fullBackupSet == nil { return err diff --git a/pkg/dataprotection/restore/utils.go b/pkg/dataprotection/restore/utils.go index e9059cf2da2..12ce4872440 100644 --- a/pkg/dataprotection/restore/utils.go +++ b/pkg/dataprotection/restore/utils.go @@ -306,6 +306,10 @@ func FormatRestoreTimeAndValidate(restoreTimeStr string, continuousBackup *dpv1a } restoreTimeStr = restoreTime.UTC().Format(time.RFC3339) // TODO: check with Recoverable time + + if continuousBackup.Status.TimeRange == nil || continuousBackup.Status.TimeRange.Start.IsZero() || continuousBackup.Status.TimeRange.End.IsZero() { + return restoreTimeStr, fmt.Errorf("invalid timeRange of the backup") + } if !isTimeInRange(restoreTime, continuousBackup.Status.TimeRange.Start.Time, continuousBackup.Status.TimeRange.End.Time) { return restoreTimeStr, fmt.Errorf("restore-to-time is out of time range, you can view the recoverable time: \n"+ "\tkbcli cluster describe %s -n %s", continuousBackup.Labels[constant.AppInstanceLabelKey], continuousBackup.Namespace)