Skip to content

Commit

Permalink
issue 8044: generic restore - allow to ignore WaitForFirstConsumer
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
  • Loading branch information
Lyndon-Li committed Dec 24, 2024
1 parent 703a726 commit 5b1f3bd
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 40 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8550-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix issue #8044, allow users to ignore WaitForFirtConsumer volumes' requirement of waiting for pod schedule for restorePVC of data mover
8 changes: 7 additions & 1 deletion pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,13 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
var restorePVCConfig nodeagent.RestorePVC
if s.dataPathConfigs != nil && s.dataPathConfigs.RestorePVCConfig != nil {
restorePVCConfig = *s.dataPathConfigs.RestorePVCConfig
s.logger.Infof("Using customized restorePVC config %v", restorePVCConfig)
}

Check warning on line 360 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L356-L360

Added lines #L356 - L360 were not covered by tests

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, restorePVCConfig, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)

Check warning on line 362 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L362

Added line #L362 was not covered by tests
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ type DataDownloadReconciler struct {
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
restorePVCConfig nodeagent.RestorePVC
podResources v1.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}

func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
restorePVCConfig nodeagent.RestorePVC, podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration,
logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
Expand All @@ -79,6 +81,7 @@ func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeCl
Clock: &clock.RealClock{},
nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
restorePVCConfig: restorePVCConfig,
dataPathMgr: dataPathMgr,
podResources: podResources,
preparingTimeout: preparingTimeout,
Expand Down Expand Up @@ -194,7 +197,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration)
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposer.GenericRestoreExposeParam{
TargetPVCName: dd.Spec.TargetVolume.PVC,
SourceNamespace: dd.Spec.TargetVolume.Namespace,
HostingPodLabels: hostingPodLabels,
Resources: r.podResources,
ExposeTimeout: dd.Spec.OperationTimeout.Duration,
RestorePVCConfig: r.restorePVCConfig,
})
if err != nil {
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"

Expand Down Expand Up @@ -140,7 +141,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...

dataPathMgr := datapath.NewManager(1)

return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nodeagent.RestorePVC{}, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func TestDataDownloadReconcile(t *testing.T) {
Expand Down Expand Up @@ -959,7 +960,7 @@ func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler,
return dt.resumeErr
}

func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error {
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, exposer.GenericRestoreExposeParam) error {
return nil
}

Expand Down
39 changes: 30 additions & 9 deletions pkg/exposer/generic_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,31 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/kube"
)

// GenericRestoreExposeParam define the input param for Generic Restore Expose
type GenericRestoreExposeParam struct {
// TargetPVCName is the target volume name to be restored
TargetPVCName string

// SourceNamespace is the original namespace of the volume that the snapshot is taken for
SourceNamespace string

// HostingPodLabels is the labels that are going to apply to the hosting pod
HostingPodLabels map[string]string

// Resources defines the resource requirements of the hosting pod
Resources corev1.ResourceRequirements

// ExposeTimeout specifies the timeout for the entire expose process
ExposeTimeout time.Duration

// RestorePVCConfig is the config for restorePVC (intermediate PVC) of generic restore
RestorePVCConfig nodeagent.RestorePVC
}

// GenericRestoreExposer is the interfaces for a generic restore exposer
type GenericRestoreExposer interface {
// Expose starts the process to a restore expose, the expose process may take long time
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error
Expose(context.Context, corev1.ObjectReference, GenericRestoreExposeParam) error

// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
Expand Down Expand Up @@ -74,25 +95,25 @@ type genericRestoreExposer struct {
log logrus.FieldLogger
}

func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, resources corev1.ResourceRequirements, timeout time.Duration) error {
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param GenericRestoreExposeParam) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"target PVC": targetPVCName,
"source namespace": sourceNamespace,
"target PVC": param.TargetPVCName,
"source namespace": param.SourceNamespace,
})

selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), targetPVCName, sourceNamespace, e.kubeClient.StorageV1(), timeout)
selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), param.TargetPVCName, param.SourceNamespace, e.kubeClient.StorageV1(), param.ExposeTimeout, param.RestorePVCConfig.IgnoreWaitForFirstConsumer)
if err != nil {
return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", sourceNamespace, targetPVCName)
return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", param.SourceNamespace, param.TargetPVCName)
}

curLog.WithField("target PVC", targetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed")
curLog.WithField("target PVC", param.TargetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed")

if kube.IsPVCBound(targetPVC) {
return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
return errors.Errorf("Target PVC %s/%s has already been bound, abort", param.SourceNamespace, param.TargetPVCName)
}

restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources)
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, param.ExposeTimeout, param.HostingPodLabels, selectedNode, param.Resources)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/exposer/generic_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,12 @@ func TestRestoreExpose(t *testing.T) {
}
}

err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond)
err := exposer.Expose(context.Background(), ownerObject, GenericRestoreExposeParam{
TargetPVCName: test.targetPVCName,
SourceNamespace: test.sourceNamespace,
HostingPodLabels: map[string]string{},
Resources: corev1.ResourceRequirements{},
ExposeTimeout: time.Millisecond})
assert.EqualError(t, err, test.err)
})
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/exposer/mocks/GenericRestoreExposer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/nodeagent/node_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type BackupPVC struct {
SPCNoRelabeling bool `json:"spcNoRelabeling,omitempty"`
}

type RestorePVC struct {
// IgnoreWaitForFirstConsumer indicates to ignore the schedule status of restorePod if the PVC is in WaitForFirstConsumer mode
IgnoreWaitForFirstConsumer bool `json:"ignoreWaitForFirstConsumer,omitempty"`
}

type Configs struct {
// LoadConcurrency is the config for data path load concurrency per node.
LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"`
Expand All @@ -88,6 +93,9 @@ type Configs struct {
// BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement
BackupPVCConfig map[string]BackupPVC `json:"backupPVC,omitempty"`

// RestoreVCConfig is the config for restorePVC (intermediate PVC) of generic restore
RestorePVCConfig *RestorePVC `json:"restorePVC,omitempty"`

// PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods.
PodResources *kube.PodResources `json:"podResources,omitempty"`
}
Expand Down
24 changes: 13 additions & 11 deletions pkg/util/kube/pvc_pv.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func SetPVReclaimPolicy(ctx context.Context, pvGetter corev1client.CoreV1Interfa
// nothing if the consuming doesn't affect the PV provision.
// The latest PVC and the selected node will be returned.
func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string,
storageClient storagev1.StorageV1Interface, timeout time.Duration) (string, *corev1api.PersistentVolumeClaim, error) {
storageClient storagev1.StorageV1Interface, timeout time.Duration, ignoreConsume bool) (string, *corev1api.PersistentVolumeClaim, error) {
selectedNode := ""
var updated *corev1api.PersistentVolumeClaim
var storageClass *storagev1api.StorageClass
Expand All @@ -281,18 +281,20 @@ func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface
return false, errors.Wrapf(err, "error to get pvc %s/%s", namespace, pvc)
}

if tmpPVC.Spec.StorageClassName != nil && storageClass == nil {
storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName)
if !ignoreConsume {
if tmpPVC.Spec.StorageClassName != nil && storageClass == nil {
storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName)
}
}
}

if storageClass != nil {
if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer {
selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode]
if selectedNode == "" {
return false, nil
if storageClass != nil {
if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer {
selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode]
if selectedNode == "" {
return false, nil
}
}
}
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/util/kube/pvc_pv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,15 @@ func TestWaitPVCConsumed(t *testing.T) {
}

tests := []struct {
name string
pvcName string
pvcNamespace string
kubeClientObj []runtime.Object
kubeReactors []reactor
expectedPVC *corev1api.PersistentVolumeClaim
selectedNode string
err string
name string
pvcName string
pvcNamespace string
kubeClientObj []runtime.Object
kubeReactors []reactor
expectedPVC *corev1api.PersistentVolumeClaim
selectedNode string
ignoreWaitForFirstConsumer bool
err string
}{
{
name: "get pvc error",
Expand All @@ -212,6 +213,16 @@ func TestWaitPVCConsumed(t *testing.T) {
},
expectedPVC: pvcObject,
},
{
name: "success when ignore wait for first consumer",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
ignoreWaitForFirstConsumer: true,
kubeClientObj: []runtime.Object{
pvcObjectWithSC,
},
expectedPVC: pvcObjectWithSC,
},
{
name: "get sc fail",
pvcName: "fake-pvc-2",
Expand Down Expand Up @@ -274,7 +285,7 @@ func TestWaitPVCConsumed(t *testing.T) {

var kubeClient kubernetes.Interface = fakeKubeClient

selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond)
selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond, test.ignoreWaitForFirstConsumer)

if err != nil {
assert.EqualError(t, err, test.err)
Expand Down

0 comments on commit 5b1f3bd

Please sign in to comment.