diff --git a/changelogs/unreleased/8550-Lyndon-Li b/changelogs/unreleased/8550-Lyndon-Li new file mode 100644 index 0000000000..a09a32115d --- /dev/null +++ b/changelogs/unreleased/8550-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8044, allow users to ignore WaitForFirtConsumer volumes' requirement of waiting for pod schedule for restorePVC of data mover \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 713d863593..d5e7193cc1 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -353,7 +353,13 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + var restorePVCConfig nodeagent.RestorePVC + if s.dataPathConfigs != nil && s.dataPathConfigs.RestorePVCConfig != nil { + restorePVCConfig = *s.dataPathConfigs.RestorePVCConfig + s.logger.Infof("Using customized restorePVC config %v", restorePVCConfig) + } + + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, restorePVCConfig, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 347bcfed58..bfe7726042 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -64,13 +64,15 @@ type DataDownloadReconciler struct { restoreExposer exposer.GenericRestoreExposer nodeName string dataPathMgr *datapath.Manager + restorePVCConfig nodeagent.RestorePVC podResources v1.ResourceRequirements preparingTimeout time.Duration metrics *metrics.ServerMetrics } func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, - podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { + restorePVCConfig nodeagent.RestorePVC, podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, + logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ client: client, kubeClient: kubeClient, @@ -79,6 +81,7 @@ func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeCl Clock: &clock.RealClock{}, nodeName: nodeName, restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), + restorePVCConfig: restorePVCConfig, dataPathMgr: dataPathMgr, podResources: podResources, preparingTimeout: preparingTimeout, @@ -194,7 +197,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, // but the pod maybe is not in the same node of the current controller, so we need to return it here. // And then only the controller who is in the same node could do the rest work. - err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration) + err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposer.GenericRestoreExposeParam{ + TargetPVCName: dd.Spec.TargetVolume.PVC, + SourceNamespace: dd.Spec.TargetVolume.Namespace, + HostingPodLabels: hostingPodLabels, + Resources: r.podResources, + ExposeTimeout: dd.Spec.OperationTimeout.Duration, + RestorePVCConfig: r.restorePVCConfig, + }) if err != nil { if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil { if !apierrors.IsNotFound(err) { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 7bb224fa10..d3d9488958 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -50,6 +50,7 @@ import ( datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks" "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" + "github.com/vmware-tanzu/velero/pkg/nodeagent" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/uploader" @@ -140,7 +141,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... dataPathMgr := datapath.NewManager(1) - return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nodeagent.RestorePVC{}, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { @@ -959,7 +960,7 @@ func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, return dt.resumeErr } -func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error { +func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, exposer.GenericRestoreExposeParam) error { return nil } diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go index 7a7df90385..cae7e28a14 100644 --- a/pkg/exposer/generic_restore.go +++ b/pkg/exposer/generic_restore.go @@ -35,10 +35,31 @@ import ( "github.com/vmware-tanzu/velero/pkg/util/kube" ) +// GenericRestoreExposeParam define the input param for Generic Restore Expose +type GenericRestoreExposeParam struct { + // TargetPVCName is the target volume name to be restored + TargetPVCName string + + // SourceNamespace is the original namespace of the volume that the snapshot is taken for + SourceNamespace string + + // HostingPodLabels is the labels that are going to apply to the hosting pod + HostingPodLabels map[string]string + + // Resources defines the resource requirements of the hosting pod + Resources corev1.ResourceRequirements + + // ExposeTimeout specifies the timeout for the entire expose process + ExposeTimeout time.Duration + + // RestorePVCConfig is the config for restorePVC (intermediate PVC) of generic restore + RestorePVCConfig nodeagent.RestorePVC +} + // GenericRestoreExposer is the interfaces for a generic restore exposer type GenericRestoreExposer interface { // Expose starts the process to a restore expose, the expose process may take long time - Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error + Expose(context.Context, corev1.ObjectReference, GenericRestoreExposeParam) error // GetExposed polls the status of the expose. // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. @@ -74,25 +95,25 @@ type genericRestoreExposer struct { log logrus.FieldLogger } -func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, resources corev1.ResourceRequirements, timeout time.Duration) error { +func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param GenericRestoreExposeParam) error { curLog := e.log.WithFields(logrus.Fields{ "owner": ownerObject.Name, - "target PVC": targetPVCName, - "source namespace": sourceNamespace, + "target PVC": param.TargetPVCName, + "source namespace": param.SourceNamespace, }) - selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), targetPVCName, sourceNamespace, e.kubeClient.StorageV1(), timeout) + selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), param.TargetPVCName, param.SourceNamespace, e.kubeClient.StorageV1(), param.ExposeTimeout, param.RestorePVCConfig.IgnoreWaitForFirstConsumer) if err != nil { - return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", sourceNamespace, targetPVCName) + return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", param.SourceNamespace, param.TargetPVCName) } - curLog.WithField("target PVC", targetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed") + curLog.WithField("target PVC", param.TargetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed") if kube.IsPVCBound(targetPVC) { - return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName) + return errors.Errorf("Target PVC %s/%s has already been bound, abort", param.SourceNamespace, param.TargetPVCName) } - restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources) + restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, param.ExposeTimeout, param.HostingPodLabels, selectedNode, param.Resources) if err != nil { return errors.Wrapf(err, "error to create restore pod") } diff --git a/pkg/exposer/generic_restore_test.go b/pkg/exposer/generic_restore_test.go index d2d56ece73..338d58b52b 100644 --- a/pkg/exposer/generic_restore_test.go +++ b/pkg/exposer/generic_restore_test.go @@ -180,7 +180,12 @@ func TestRestoreExpose(t *testing.T) { } } - err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond) + err := exposer.Expose(context.Background(), ownerObject, GenericRestoreExposeParam{ + TargetPVCName: test.targetPVCName, + SourceNamespace: test.sourceNamespace, + HostingPodLabels: map[string]string{}, + Resources: corev1.ResourceRequirements{}, + ExposeTimeout: time.Millisecond}) assert.EqualError(t, err, test.err) }) } diff --git a/pkg/exposer/mocks/GenericRestoreExposer.go b/pkg/exposer/mocks/GenericRestoreExposer.go index 83a9789af6..fb8fc27906 100644 --- a/pkg/exposer/mocks/GenericRestoreExposer.go +++ b/pkg/exposer/mocks/GenericRestoreExposer.go @@ -44,17 +44,17 @@ func (_m *GenericRestoreExposer) DiagnoseExpose(_a0 context.Context, _a1 v1.Obje return r0 } -// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5, _a6 -func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 v1.ResourceRequirements, _a6 time.Duration) error { - ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6) +// Expose provides a mock function with given fields: _a0, _a1, _a2 +func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 exposer.GenericRestoreExposeParam) error { + ret := _m.Called(_a0, _a1, _a2) if len(ret) == 0 { panic("no return value specified for Expose") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, v1.ResourceRequirements, time.Duration) error); ok { - r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6) + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, exposer.GenericRestoreExposeParam) error); ok { + r0 = rf(_a0, _a1, _a2) } else { r0 = ret.Error(0) } diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 898ea1e018..8cf45d7a7f 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -78,6 +78,11 @@ type BackupPVC struct { SPCNoRelabeling bool `json:"spcNoRelabeling,omitempty"` } +type RestorePVC struct { + // IgnoreWaitForFirstConsumer indicates to ignore the schedule status of restorePod if the PVC is in WaitForFirstConsumer mode + IgnoreWaitForFirstConsumer bool `json:"ignoreWaitForFirstConsumer,omitempty"` +} + type Configs struct { // LoadConcurrency is the config for data path load concurrency per node. LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"` @@ -88,6 +93,9 @@ type Configs struct { // BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement BackupPVCConfig map[string]BackupPVC `json:"backupPVC,omitempty"` + // RestoreVCConfig is the config for restorePVC (intermediate PVC) of generic restore + RestorePVCConfig *RestorePVC `json:"restorePVC,omitempty"` + // PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods. PodResources *kube.PodResources `json:"podResources,omitempty"` } diff --git a/pkg/util/kube/pvc_pv.go b/pkg/util/kube/pvc_pv.go index bf7779aaa7..940f971e3e 100644 --- a/pkg/util/kube/pvc_pv.go +++ b/pkg/util/kube/pvc_pv.go @@ -270,7 +270,7 @@ func SetPVReclaimPolicy(ctx context.Context, pvGetter corev1client.CoreV1Interfa // nothing if the consuming doesn't affect the PV provision. // The latest PVC and the selected node will be returned. func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string, - storageClient storagev1.StorageV1Interface, timeout time.Duration) (string, *corev1api.PersistentVolumeClaim, error) { + storageClient storagev1.StorageV1Interface, timeout time.Duration, ignoreConsume bool) (string, *corev1api.PersistentVolumeClaim, error) { selectedNode := "" var updated *corev1api.PersistentVolumeClaim var storageClass *storagev1api.StorageClass @@ -281,18 +281,20 @@ func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface return false, errors.Wrapf(err, "error to get pvc %s/%s", namespace, pvc) } - if tmpPVC.Spec.StorageClassName != nil && storageClass == nil { - storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{}) - if err != nil { - return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName) + if !ignoreConsume { + if tmpPVC.Spec.StorageClassName != nil && storageClass == nil { + storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName) + } } - } - if storageClass != nil { - if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer { - selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode] - if selectedNode == "" { - return false, nil + if storageClass != nil { + if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer { + selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode] + if selectedNode == "" { + return false, nil + } } } } diff --git a/pkg/util/kube/pvc_pv_test.go b/pkg/util/kube/pvc_pv_test.go index 52e01ee694..59e75ec26d 100644 --- a/pkg/util/kube/pvc_pv_test.go +++ b/pkg/util/kube/pvc_pv_test.go @@ -188,14 +188,15 @@ func TestWaitPVCConsumed(t *testing.T) { } tests := []struct { - name string - pvcName string - pvcNamespace string - kubeClientObj []runtime.Object - kubeReactors []reactor - expectedPVC *corev1api.PersistentVolumeClaim - selectedNode string - err string + name string + pvcName string + pvcNamespace string + kubeClientObj []runtime.Object + kubeReactors []reactor + expectedPVC *corev1api.PersistentVolumeClaim + selectedNode string + ignoreWaitForFirstConsumer bool + err string }{ { name: "get pvc error", @@ -212,6 +213,16 @@ func TestWaitPVCConsumed(t *testing.T) { }, expectedPVC: pvcObject, }, + { + name: "success when ignore wait for first consumer", + pvcName: "fake-pvc-2", + pvcNamespace: "fake-namespace", + ignoreWaitForFirstConsumer: true, + kubeClientObj: []runtime.Object{ + pvcObjectWithSC, + }, + expectedPVC: pvcObjectWithSC, + }, { name: "get sc fail", pvcName: "fake-pvc-2", @@ -274,7 +285,7 @@ func TestWaitPVCConsumed(t *testing.T) { var kubeClient kubernetes.Interface = fakeKubeClient - selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond) + selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond, test.ignoreWaitForFirstConsumer) if err != nil { assert.EqualError(t, err, test.err)