Skip to content

Commit

Permalink
feat: support redis(standalone replica) pitr (#8043)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chiwency authored and wangyelei committed Sep 19, 2024
1 parent 7e4fdcc commit e220ccd
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 49 deletions.
6 changes: 6 additions & 0 deletions apis/dataprotection/v1alpha1/actionset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ type RestoreActionSpec struct {
//
// +optional
PostReady []ActionSpec `json:"postReady,omitempty"`

// Determines if a base backup is required during restoration.
//
// +optional
// +kubebuilder:default=true
BaseBackupRequired *bool `json:"baseBackupRequired,omitempty"`
}

// ActionSpec defines an action that should be executed. Only one of the fields may be set.
Expand Down
5 changes: 5 additions & 0 deletions apis/dataprotection/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/dataprotection.kubeblocks.io_actionsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ spec:
restore:
description: Specifies the restore action.
properties:
baseBackupRequired:
default: true
description: Determines if a base backup is required during restoration.
type: boolean
postReady:
description: Specifies the actions that should be executed after
the data has been prepared and is ready for restoration.
Expand Down
56 changes: 24 additions & 32 deletions controllers/dataprotection/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -656,6 +658,14 @@ func (r *BackupReconciler) checkIsCompletedDuringRunning(reqCtx intctrlutil.Requ
duration := backup.Status.CompletionTimestamp.Sub(backup.Status.StartTimestamp.Time).Round(time.Second)
backup.Status.Duration = &metav1.Duration{Duration: duration}
}

for i := range backup.Status.Actions {
act := &backup.Status.Actions[i]
act.Phase = dpv1alpha1.ActionPhaseCompleted
act.AvailableReplicas = pointer.Int32(int32(0))
act.CompletionTimestamp = backup.Status.CompletionTimestamp
}

return true, r.Client.Status().Patch(reqCtx.Ctx, backup, patch)
}

Expand Down Expand Up @@ -693,15 +703,6 @@ func (r *BackupReconciler) updateStatusIfFailed(
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}

// deleteExternalJobs deletes the external jobs.
func (r *BackupReconciler) deleteExternalJobs(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
labels := dpbackup.BuildBackupWorkloadLabels(backup)
if err := deleteRelatedJobs(reqCtx, r.Client, backup.Namespace, labels); err != nil {
return err
}
return deleteRelatedJobs(reqCtx, r.Client, viper.GetString(constant.CfgKeyCtrlrMgrNS), labels)
}

func (r *BackupReconciler) deleteVolumeSnapshots(reqCtx intctrlutil.RequestCtx,
backup *dpv1alpha1.Backup) error {
deleter := &dpbackup.Deleter{
Expand All @@ -711,34 +712,25 @@ func (r *BackupReconciler) deleteVolumeSnapshots(reqCtx intctrlutil.RequestCtx,
return deleter.DeleteVolumeSnapshots(backup)
}

// deleteExternalStatefulSet deletes the external statefulSet.
func (r *BackupReconciler) deleteExternalStatefulSet(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
key := client.ObjectKey{
Namespace: backup.Namespace,
Name: backup.Name,
}
sts := &appsv1.StatefulSet{}
if err := r.Client.Get(reqCtx.Ctx, key, sts); err != nil {
return client.IgnoreNotFound(err)
}

patch := client.MergeFrom(sts.DeepCopy())
controllerutil.RemoveFinalizer(sts, dptypes.DataProtectionFinalizerName)
if err := r.Client.Patch(reqCtx.Ctx, sts, patch); err != nil {
return err
}
reqCtx.Log.V(1).Info("delete statefulSet", "statefulSet", sts)
return intctrlutil.BackgroundDeleteObject(r.Client, reqCtx.Ctx, sts)
}

// deleteExternalResources deletes the external workloads that execute backup.
// Currently, it only supports two types of workloads: job.
// Currently, it only supports two types of workloads: job, statefulSet
func (r *BackupReconciler) deleteExternalResources(
reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
if err := r.deleteExternalJobs(reqCtx, backup); err != nil {
labels := dpbackup.BuildBackupWorkloadLabels(backup)

// use map to avoid duplicate deletion of the same namespace.
namespaces := map[string]sets.Empty{
backup.Namespace: {},
viper.GetString(constant.CfgKeyCtrlrMgrNS): {},
}

// delete the external jobs.
if err := deleteRelatedObjectList(reqCtx, r.Client, &batchv1.JobList{}, namespaces, labels); err != nil {
return err
}
return r.deleteExternalStatefulSet(reqCtx, backup)

// delete the external statefulSets.
return deleteRelatedObjectList(reqCtx, r.Client, &appsv1.StatefulSetList{}, namespaces, labels)
}

// PatchBackupObjectMeta patches backup object metaObject include cluster snapshot.
Expand Down
11 changes: 8 additions & 3 deletions controllers/dataprotection/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -126,10 +127,14 @@ func (r *RestoreReconciler) parseRestoreJob(ctx context.Context, object client.O

func (r *RestoreReconciler) deleteExternalResources(reqCtx intctrlutil.RequestCtx, restore *dpv1alpha1.Restore) error {
labels := map[string]string{dprestore.DataProtectionRestoreLabelKey: restore.Name}
if err := deleteRelatedJobs(reqCtx, r.Client, restore.Namespace, labels); err != nil {
return err

// use map to avoid duplicate deletion of the same namespace.
namespaces := map[string]sets.Empty{
restore.Namespace: {},
viper.GetString(constant.CfgKeyCtrlrMgrNS): {},
}
return deleteRelatedJobs(reqCtx, r.Client, viper.GetString(constant.CfgKeyCtrlrMgrNS), labels)

return deleteRelatedObjectList(reqCtx, r.Client, &batchv1.JobList{}, namespaces, labels)
}

func CheckBackupRepoForRestore(reqCtx intctrlutil.RequestCtx, cli client.Client, restore *dpv1alpha1.Restore) (string, error) {
Expand Down
39 changes: 26 additions & 13 deletions controllers/dataprotection/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
"strings"
"sync"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -295,24 +297,35 @@ func getDefaultBackupRepo(ctx context.Context, cli client.Client) (*dpv1alpha1.B
return defaultRepo, nil
}

func deleteRelatedJobs(reqCtx intctrlutil.RequestCtx, cli client.Client, namespace string, labels map[string]string) error {
if labels == nil || namespace == "" {
type objectList interface {
*appsv1.StatefulSetList | *batchv1.JobList
client.ObjectList
}

func deleteRelatedObjectList[T objectList](reqCtx intctrlutil.RequestCtx, cli client.Client, list T, namespaces map[string]sets.Empty, labels map[string]string) error {
if labels == nil || len(namespaces) == 0 {
return nil
}
jobs := &batchv1.JobList{}
if err := cli.List(reqCtx.Ctx, jobs,
client.MatchingLabels(labels)); err != nil {
return client.IgnoreNotFound(err)
}
for i := range jobs.Items {
job := &jobs.Items[i]
if err := dputils.RemoveDataProtectionFinalizer(reqCtx.Ctx, cli, job); err != nil {
return err

for ns := range namespaces {
if err := cli.List(reqCtx.Ctx, list, client.InNamespace(ns),
client.MatchingLabels(labels)); err != nil {
return client.IgnoreNotFound(err)
}
if err := intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, job); err != nil {
return err
objs := reflect.ValueOf(list).Elem().FieldByName("Items")
if !objs.IsZero() {
for i := 0; i < objs.Len(); i++ {
obj := objs.Index(i).Addr().Interface().(client.Object)
if err := dputils.RemoveDataProtectionFinalizer(reqCtx.Ctx, cli, obj); err != nil {
return err
}
if err := intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, obj); err != nil {
return err
}
}
}
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions deploy/helm/crds/dataprotection.kubeblocks.io_actionsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ spec:
restore:
description: Specifies the restore action.
properties:
baseBackupRequired:
default: true
description: Determines if a base backup is required during restoration.
type: boolean
postReady:
description: Specifies the actions that should be executed after
the data has been prepared and is ready for restoration.
Expand Down
12 changes: 12 additions & 0 deletions docs/developer_docs/api-reference/backup.md
Original file line number Diff line number Diff line change
Expand Up @@ -4370,6 +4370,18 @@ JobActionSpec
<p>Specifies the actions that should be executed after the data has been prepared and is ready for restoration.</p>
</td>
</tr>
<tr>
<td>
<code>baseBackupRequired</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>Determines if a base backup is required during restoration.</p>
</td>
</tr>
</tbody>
</table>
<h3 id="dataprotection.kubeblocks.io/v1alpha1.RestoreActionStatus">RestoreActionStatus
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataprotection/backup/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (r *Request) buildBackupDataAction(targetPod *corev1.Pod, name string) (act
Name: name,
ObjectMeta: metav1.ObjectMeta{
Namespace: r.Namespace,
Name: r.Name,
Name: GenerateBackupStatefulSetName(r.Backup, r.Target.Name, BackupDataJobNamePrefix),
Labels: BuildBackupWorkloadLabels(r.Backup),
},
Replicas: pointer.Int32(int32(1)),
Expand Down
11 changes: 11 additions & 0 deletions pkg/dataprotection/backup/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ func GenerateBackupJobName(backup *dpv1alpha1.Backup, prefix string) string {
return name
}

func GenerateBackupStatefulSetName(backup *dpv1alpha1.Backup, targetName, prefix string) string {
name := backup.Name
// for cluster mode with multiple targets, the statefulSet name should include the target name.
if targetName != "" {
name = fmt.Sprintf("%s-%s-%s", prefix, targetName, backup.Name)
}
// statefulSet name cannot exceed 52 characters for label name limit as the statefulset controller will
// add a 10-length suffix to the name to construct the label "controller-revision-hash": "<statefulset_name>-<hash>"
return strings.TrimSuffix(name[:min(len(name), 52)], "-")
}

func generateBaseCRNameByBackupSchedule(uniqueNameWithBackupSchedule, backupScheduleNS, method string) string {
name := fmt.Sprintf("%s-%s", uniqueNameWithBackupSchedule, backupScheduleNS)
if len(name) > 30 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/dataprotection/restore/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (r *RestoreManager) BuildContinuousRestoreManager(reqCtx intctrlutil.Reques
if err := checkRestoreTime(); err != nil {
return err
}

if baseBackupRequired := continuousBackupSet.ActionSet.Spec.Restore.BaseBackupRequired; boolptr.IsSetToFalse(baseBackupRequired) {
r.SetBackupSets(continuousBackupSet)
return nil
}

fullBackupSet, err := r.getFullBackupActionSetForContinuous(reqCtx, cli, continuousBackup, metav1.NewTime(restoreTime))
if err != nil || fullBackupSet == nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/dataprotection/restore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ func FormatRestoreTimeAndValidate(restoreTimeStr string, continuousBackup *dpv1a
}
restoreTimeStr = restoreTime.UTC().Format(time.RFC3339)
// TODO: check with Recoverable time

if continuousBackup.Status.TimeRange == nil || continuousBackup.Status.TimeRange.Start.IsZero() || continuousBackup.Status.TimeRange.End.IsZero() {
return restoreTimeStr, fmt.Errorf("invalid timeRange of the backup")
}
if !isTimeInRange(restoreTime, continuousBackup.Status.TimeRange.Start.Time, continuousBackup.Status.TimeRange.End.Time) {
return restoreTimeStr, fmt.Errorf("restore-to-time is out of time range, you can view the recoverable time: \n"+
"\tkbcli cluster describe %s -n %s", continuousBackup.Labels[constant.AppInstanceLabelKey], continuousBackup.Namespace)
Expand Down

0 comments on commit e220ccd

Please sign in to comment.