Skip to content

Commit

Permalink
feat: add dataprotection isolation deployment (#4074)
Browse files Browse the repository at this point in the history
  • Loading branch information
dengshaojiang committed Jul 4, 2023
1 parent aff530f commit 6d32009
Show file tree
Hide file tree
Showing 14 changed files with 446 additions and 388 deletions.
354 changes: 194 additions & 160 deletions cmd/manager/main.go

Large diffs are not rendered by default.

11 changes: 1 addition & 10 deletions controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"context"
"time"

snapshotv1beta1 "github.com/kubernetes-csi/external-snapshotter/client/v3/apis/volumesnapshot/v1beta1"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/spf13/viper"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -34,7 +32,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -220,13 +217,7 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&dataprotectionv1alpha1.Backup{}).
Owns(&batchv1.Job{}).
Watches(&source.Kind{Type: &corev1.Pod{}}, handler.EnqueueRequestsFromMapFunc(r.filterClusterPods))
if viper.GetBool("VOLUMESNAPSHOT") {
if intctrlutil.InVolumeSnapshotV1Beta1() {
b.Owns(&snapshotv1beta1.VolumeSnapshot{}, builder.Predicates{})
} else {
b.Owns(&snapshotv1.VolumeSnapshot{}, builder.Predicates{})
}
}

return b.Complete(r)
}

Expand Down
5 changes: 1 addition & 4 deletions controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,7 @@ var _ = Describe("Cluster Controller", func() {
Name: backupKey.Name,
Namespace: backupKey.Namespace,
Labels: map[string]string{
constant.KBManagedByKey: "cluster",
constant.AppInstanceLabelKey: clusterKey.Name,
constant.KBAppComponentLabelKey: comp.Name,
constant.DataProtectionLabelBackupNameKey: backupKey.Name,
}},
Spec: snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
Expand Down Expand Up @@ -610,7 +608,6 @@ var _ = Describe("Cluster Controller", func() {
constant.AppInstanceLabelKey: clusterKey.Name,
constant.KBAppComponentLabelKey: comp.Name,
}, client.InNamespace(clusterKey.Namespace))).Should(HaveLen(0))
Eventually(testapps.CheckObjExists(&testCtx, backupKey, &snapshotv1.VolumeSnapshot{}, false)).Should(Succeed())

if !viper.GetBool("VOLUMESNAPSHOT") && len(viper.GetString(constant.CfgKeyBackupPVCName)) > 0 {
By("Checking restore job cleanup")
Expand Down
120 changes: 25 additions & 95 deletions controllers/apps/components/base_stateful_hscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package components

import (
"fmt"
"strings"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/spf13/viper"
Expand Down Expand Up @@ -272,18 +271,8 @@ func (d *snapshotDataClone) backup() ([]client.Object, error) {

backupPolicyTemplate := &appsv1alpha1.BackupPolicyTemplate{}
err := d.cli.Get(d.reqCtx.Ctx, client.ObjectKey{Name: backupPolicyTplName}, backupPolicyTemplate)
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
// no backuppolicytemplate, then try native volumesnapshot
if err != nil {
pvcName := strings.Join([]string{d.backupVCT().Name, d.stsObj.Name, "0"}, "-")
snapshot, err := builder.BuildVolumeSnapshot(d.key, pvcName, d.stsObj)
if err != nil {
return nil, err
}
d.reqCtx.Eventf(d.cluster, corev1.EventTypeNormal, "VolumeSnapshotCreate", "Create volumesnapshot/%s", d.key.Name)
return []client.Object{snapshot}, nil
return nil, err
}

// if there is backuppolicytemplate created by provider
Expand All @@ -304,49 +293,26 @@ func (d *snapshotDataClone) backup() ([]client.Object, error) {
}

func (d *snapshotDataClone) checkBackupStatus() (backupStatus, error) {
hasBackupPolicyTemplate := true
backupPolicyTplName := d.component.HorizontalScalePolicy.BackupPolicyTemplateName
backupPolicyTemplate := &appsv1alpha1.BackupPolicyTemplate{}
err := d.cli.Get(d.reqCtx.Ctx, client.ObjectKey{Name: backupPolicyTplName}, backupPolicyTemplate)
if err != nil && !errors.IsNotFound(err) {
if err != nil {
return backupStatusFailed, err
}
if errors.IsNotFound(err) {
hasBackupPolicyTemplate = false
}
// if no backuppolicytemplate, do not check backup
if hasBackupPolicyTemplate {
backup := dataprotectionv1alpha1.Backup{}
if err := d.cli.Get(d.reqCtx.Ctx, d.key, &backup); err != nil {
if errors.IsNotFound(err) {
return backupStatusNotCreated, nil
} else {
return backupStatusFailed, err
}
}
if backup.Status.Phase == dataprotectionv1alpha1.BackupFailed {
return backupStatusFailed, intctrlutil.NewErrorf(intctrlutil.ErrorTypeBackupFailed, "backup for horizontalScaling failed: %s",
backup.Status.FailureReason)
}
if backup.Status.Phase != dataprotectionv1alpha1.BackupCompleted {
return backupStatusProcessing, nil
}
} else {
vsExists, err := d.isVolumeSnapshotExists()
if err != nil {
return backupStatusFailed, err
}
if !vsExists {
backup := dataprotectionv1alpha1.Backup{}
if err := d.cli.Get(d.reqCtx.Ctx, d.key, &backup); err != nil {
if errors.IsNotFound(err) {
return backupStatusNotCreated, nil
}
// volumesnapshot exists, check if it is ready for use.
ready, err := d.isVolumeSnapshotReadyToUse()
if err != nil {
} else {
return backupStatusFailed, err
}
if !ready {
return backupStatusProcessing, nil
}
}
if backup.Status.Phase == dataprotectionv1alpha1.BackupFailed {
return backupStatusFailed, intctrlutil.NewErrorf(intctrlutil.ErrorTypeBackupFailed, "backup for horizontalScaling failed: %s",
backup.Status.FailureReason)
}
if backup.Status.Phase != dataprotectionv1alpha1.BackupCompleted {
return backupStatusProcessing, nil
}
return backupStatusReadyToUse, nil
}
Expand Down Expand Up @@ -377,43 +343,19 @@ func (d *snapshotDataClone) checkRestoreStatus(pvcKey types.NamespacedName) (bac
return backupStatusReadyToUse, nil
}

// check snapshot existence
func (d *snapshotDataClone) isVolumeSnapshotExists() (bool, error) {
ml := d.getBackupMatchingLabels()
vsList := snapshotv1.VolumeSnapshotList{}
compatClient := intctrlutil.VolumeSnapshotCompatClient{ReadonlyClient: d.cli, Ctx: d.reqCtx.Ctx}
if err := compatClient.List(&vsList, ml); err != nil {
return false, client.IgnoreNotFound(err)
}
for _, vs := range vsList.Items {
// when do h-scale very shortly after last h-scale,
// the last volume snapshot could not be deleted completely
if vs.DeletionTimestamp.IsZero() {
return true, nil
}
}
return false, nil
}

// check snapshot ready to use
func (d *snapshotDataClone) isVolumeSnapshotReadyToUse() (bool, error) {
ml := d.getBackupMatchingLabels()
vsList := snapshotv1.VolumeSnapshotList{}
func (d *snapshotDataClone) listVolumeSnapshotByLabels(vsList *snapshotv1.VolumeSnapshotList, ml client.MatchingLabels) error {
compatClient := intctrlutil.VolumeSnapshotCompatClient{ReadonlyClient: d.cli, Ctx: d.reqCtx.Ctx}
if err := compatClient.List(&vsList, ml); err != nil {
return false, client.IgnoreNotFound(err)
}
if len(vsList.Items) == 0 || vsList.Items[0].Status == nil {
return false, nil
}
status := vsList.Items[0].Status
if status.Error != nil {
return false, fmt.Errorf("VolumeSnapshot/" + vsList.Items[0].Name + ": " + *status.Error.Message)
}
if status.ReadyToUse == nil {
return false, nil
// get vs from backup.
backupList := dataprotectionv1alpha1.BackupList{}
if err := d.cli.List(d.reqCtx.Ctx, &backupList, ml); err != nil {
return err
} else if len(backupList.Items) == 0 {
// ignore not found
return nil
}
return *status.ReadyToUse, nil
return compatClient.List(vsList, client.MatchingLabels{
constant.DataProtectionLabelBackupNameKey: backupList.Items[0].Name,
})
}

func (d *snapshotDataClone) checkedCreatePVCFromSnapshot(pvcKey types.NamespacedName,
Expand All @@ -426,8 +368,7 @@ func (d *snapshotDataClone) checkedCreatePVCFromSnapshot(pvcKey types.Namespaced
}
ml := d.getBackupMatchingLabels()
vsList := snapshotv1.VolumeSnapshotList{}
compatClient := intctrlutil.VolumeSnapshotCompatClient{ReadonlyClient: d.cli, Ctx: d.reqCtx.Ctx}
if err := compatClient.List(&vsList, ml); err != nil {
if err = d.listVolumeSnapshotByLabels(&vsList, ml); err != nil {
return nil, err
}
if len(vsList.Items) == 0 {
Expand Down Expand Up @@ -466,17 +407,6 @@ func (d *snapshotDataClone) deleteSnapshot() ([]client.Object, error) {
if len(objs) > 0 {
d.reqCtx.Recorder.Eventf(d.cluster, corev1.EventTypeNormal, "BackupJobDelete", "Delete backupJob/%s", d.key.Name)
}
// delete volumesnapshot separately since backup may not exist if backuppolicytemplate not configured
compatClient := intctrlutil.VolumeSnapshotCompatClient{ReadonlyClient: d.cli, Ctx: d.reqCtx.Ctx}
vs := &snapshotv1.VolumeSnapshot{}
err = compatClient.Get(d.key, vs)
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
if err == nil {
objs = append(objs, vs)
d.reqCtx.Recorder.Eventf(d.cluster, corev1.EventTypeNormal, "VolumeSnapshotDelete", "Delete volumeSnapshot/%s", d.key.Name)
}

return objs, nil
}
Expand Down
48 changes: 33 additions & 15 deletions controllers/apps/opsrequest_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"fmt"
"time"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/spf13/viper"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
dataprotectionv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
opsutil "github.com/apecloud/kubeblocks/controllers/apps/operations/util"
"github.com/apecloud/kubeblocks/internal/constant"
intctrlutil "github.com/apecloud/kubeblocks/internal/generics"
Expand Down Expand Up @@ -359,11 +360,13 @@ var _ = Describe("OpsRequest Controller", func() {

By("set component to horizontal with snapshot policy and create a cluster")
viper.Set("VOLUMESNAPSHOT", true)
Expect(testapps.GetAndChangeObj(&testCtx, client.ObjectKeyFromObject(clusterDefObj),
func(clusterDef *appsv1alpha1.ClusterDefinition) {
clusterDef.Spec.ComponentDefs[0].HorizontalScalePolicy =
&appsv1alpha1.HorizontalScalePolicy{Type: appsv1alpha1.HScaleDataClonePolicyCloneVolume}
})()).ShouldNot(HaveOccurred())
if clusterDefObj.Spec.ComponentDefs[0].HorizontalScalePolicy == nil {
Expect(testapps.GetAndChangeObj(&testCtx, client.ObjectKeyFromObject(clusterDefObj),
func(clusterDef *appsv1alpha1.ClusterDefinition) {
clusterDef.Spec.ComponentDefs[0].HorizontalScalePolicy =
&appsv1alpha1.HorizontalScalePolicy{Type: appsv1alpha1.HScaleDataClonePolicyCloneVolume}
})()).ShouldNot(HaveOccurred())
}
pvcSpec := testapps.NewPVCSpec("1Gi")
clusterObj = testapps.NewClusterFactory(testCtx.DefaultNamespace, clusterNamePrefix,
clusterDefObj.Name, clusterVersionObj.Name).WithRandomName().
Expand Down Expand Up @@ -484,19 +487,34 @@ var _ = Describe("OpsRequest Controller", func() {
g.Expect(cluster.Status.Phase).Should(Equal(appsv1alpha1.SpecReconcilingClusterPhase))
})).Should(Succeed())

By("mock VolumeSnapshot status is ready, component phase should change to Updating when component is horizontally scaling.")
snapshotKey := types.NamespacedName{Name: fmt.Sprintf("%s-%s-scaling",
By("mock backup status is ready, component phase should change to Updating when component is horizontally scaling.")
backupKey := types.NamespacedName{Name: fmt.Sprintf("%s-%s-scaling",
clusterKey.Name, mysqlCompName), Namespace: testCtx.DefaultNamespace}
volumeSnapshot := &snapshotv1.VolumeSnapshot{}
Expect(k8sClient.Get(testCtx.Ctx, snapshotKey, volumeSnapshot)).Should(Succeed())
readyToUse := true
volumeSnapshot.Status = &snapshotv1.VolumeSnapshotStatus{ReadyToUse: &readyToUse}
Expect(k8sClient.Status().Update(testCtx.Ctx, volumeSnapshot)).Should(Succeed())
backup := &dataprotectionv1alpha1.Backup{}
Expect(k8sClient.Get(testCtx.Ctx, backupKey, backup)).Should(Succeed())
backup.Status.Phase = dataprotectionv1alpha1.BackupCompleted
Expect(k8sClient.Status().Update(testCtx.Ctx, backup)).Should(Succeed())
Eventually(testapps.CheckObj(&testCtx, clusterKey, func(g Gomega, cluster *appsv1alpha1.Cluster) {
g.Expect(cluster.Status.Components[mysqlCompName].Phase).Should(Equal(appsv1alpha1.SpecReconcilingClusterCompPhase))
g.Expect(cluster.Status.Phase).Should(Equal(appsv1alpha1.SpecReconcilingClusterPhase))
})).Should(Succeed())

By("mock create volumesnapshot, which should done by backup controller")
vs := &snapshotv1.VolumeSnapshot{}
vs.Name = backupKey.Name
vs.Namespace = backupKey.Namespace
vs.Labels = map[string]string{
constant.DataProtectionLabelBackupNameKey: backupKey.Name,
}
pvcName := ""
vs.Spec = snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
PersistentVolumeClaimName: &pvcName,
},
}
Expect(k8sClient.Create(testCtx.Ctx, vs)).Should(Succeed())
Eventually(testapps.CheckObjExists(&testCtx, backupKey, vs, true)).Should(Succeed())

By("check the underlying workload been updated")
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(componentWorkload()),
func(g Gomega, sts *appsv1.StatefulSet) {
Expand All @@ -521,8 +539,8 @@ var _ = Describe("OpsRequest Controller", func() {
})()).Should(Succeed())
}

By("check the volumesnapshot created for scaling has been deleted")
Eventually(testapps.CheckObjExists(&testCtx, snapshotKey, volumeSnapshot, false)).Should(Succeed())
By("check the backup created for scaling has been deleted")
Eventually(testapps.CheckObjExists(&testCtx, backupKey, backup, false)).Should(Succeed())

By("mock component workload is running and expect cluster and component are running")
mockCompRunning(replicas)
Expand Down
Loading

0 comments on commit 6d32009

Please sign in to comment.