Skip to content

Commit

Permalink
data mover restore for Windows
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
  • Loading branch information
Lyndon-Li committed Jan 9, 2025
1 parent be5f56a commit 9651df2
Show file tree
Hide file tree
Showing 18 changed files with 233 additions and 93 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8594-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Data mover restore for Windows
3 changes: 3 additions & 0 deletions config/crd/v2alpha1/bases/velero.io_datadownloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ spec:
DataMover specifies the data mover to be used by the backup.
If DataMover is "" or "velero", the built-in data mover will be used.
type: string
nodeOS:
description: NodeOS is OS of the node where the DataUpload is processed.
type: string
operationTimeout:
description: |-
OperationTimeout specifies the time used to wait internal operations,
Expand Down
6 changes: 5 additions & 1 deletion config/crd/v2alpha1/bases/velero.io_datauploads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ spec:
description: DataUploadStatus is the current status of a DataUpload.
properties:
acceptedByNode:
description: Node is name of the node where the DataUpload is prepared.
description: AcceptedByNode is name of the node where the DataUpload
is prepared.
type: string
acceptedTimestamp:
description: |-
Expand Down Expand Up @@ -175,6 +176,9 @@ spec:
node:
description: Node is name of the node where the DataUpload is processed.
type: string
nodeOS:
description: NodeOS is OS of the node where the DataUpload is processed.
type: string
path:
description: Path is the full path of the snapshot volume being backed
up.
Expand Down
4 changes: 2 additions & 2 deletions config/crd/v2alpha1/crds/crds.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/apis/velero/v2alpha1/data_download_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type DataDownloadSpec struct {
// OperationTimeout specifies the time used to wait internal operations,
// before returning error as timeout.
OperationTimeout metav1.Duration `json:"operationTimeout"`

// NodeOS is OS of the node where the DataUpload is processed.
// +optional
NodeOS NodeOS `json:"nodeOS,omitempty"`
}

// TargetVolumeSpec is the specification for a target PVC.
Expand Down
19 changes: 18 additions & 1 deletion pkg/apis/velero/v2alpha1/data_upload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ const (
DataUploadPhaseFailed DataUploadPhase = "Failed"
)

type NodeOS string

const (
NodeOSLinux NodeOS = "linux"
NodeOSWindows NodeOS = "windows"
NodeOSAuto NodeOS = "auto"
)

// DataUploadStatus is the current status of a DataUpload.
type DataUploadStatus struct {
// Phase is the current state of the DataUpload.
Expand Down Expand Up @@ -144,7 +152,12 @@ type DataUploadStatus struct {
// Node is name of the node where the DataUpload is processed.
// +optional
Node string `json:"node,omitempty"`
// Node is name of the node where the DataUpload is prepared.

// NodeOS is OS of the node where the DataUpload is processed.
// +optional
NodeOS NodeOS `json:"nodeOS,omitempty"`

// AcceptedByNode is name of the node where the DataUpload is prepared.
// +optional
AcceptedByNode string `json:"acceptedByNode,omitempty"`

Expand Down Expand Up @@ -221,4 +234,8 @@ type DataUploadResult struct {
// +optional
// +nullable
DataMoverResult *map[string]string `json:"dataMoverResult,omitempty"`

// NodeOS is OS of the node where the DataUpload is processed.
// +optional
NodeOS NodeOS `json:"nodeOS,omitempty"`
}
6 changes: 6 additions & 0 deletions pkg/builder/data_download_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ func (d *DataDownloadBuilder) Node(node string) *DataDownloadBuilder {
return d
}

// NodeOS sets the DataDownload's Node OS.
func (d *DataDownloadBuilder) NodeOS(nodeOS velerov2alpha1api.NodeOS) *DataDownloadBuilder {
d.object.Spec.NodeOS = nodeOS
return d
}

// AcceptedByNode sets the DataDownload's AcceptedByNode.
func (d *DataDownloadBuilder) AcceptedByNode(node string) *DataDownloadBuilder {
d.object.Status.AcceptedByNode = node
Expand Down
6 changes: 6 additions & 0 deletions pkg/builder/data_upload_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ func (d *DataUploadBuilder) Node(node string) *DataUploadBuilder {
return d
}

// NodeOS sets the DataUpload's Node OS.
func (d *DataUploadBuilder) NodeOS(nodeOS velerov2alpha1api.NodeOS) *DataUploadBuilder {
d.object.Status.NodeOS = nodeOS
return d
}

// AcceptedByNode sets the DataUpload's AcceptedByNode.
func (d *DataUploadBuilder) AcceptedByNode(node string) *DataUploadBuilder {
d.object.Status.AcceptedByNode = node
Expand Down
19 changes: 18 additions & 1 deletion pkg/cmd/cli/datamover/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,24 @@ func newdataMoverRestore(logger logrus.FieldLogger, factory client.Factory, conf
return nil, errors.Wrap(err, "error to create client")
}

cache, err := ctlcache.New(clientConfig, cacheOption)
var cache ctlcache.Cache
retry := 10
for {
cache, err = ctlcache.New(clientConfig, cacheOption)
if err == nil {
break
}

retry--
if retry == 0 {
break
}

logger.WithError(err).Warn("Failed to create client cache, need retry")

time.Sleep(time.Second)
}

if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")
Expand Down
59 changes: 41 additions & 18 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,28 +183,15 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(ctx, r.kubeClient, dd.Namespace, k, kube.NodeOSLinux); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
} else {
hostingPodLabels[k] = v
}
exposeParam, err := r.setupExposeParam(dd)
if err != nil {
return r.errorOut(ctx, dd, err, "failed to set exposer parameters", log)
}

// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposer.GenericRestoreExposeParam{
TargetPVCName: dd.Spec.TargetVolume.PVC,
SourceNamespace: dd.Spec.TargetVolume.Namespace,
HostingPodLabels: hostingPodLabels,
Resources: r.podResources,
ExposeTimeout: dd.Spec.OperationTimeout.Duration,
RestorePVCConfig: r.restorePVCConfig,
})
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposeParam)
if err != nil {
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -243,7 +230,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.tryCancelAcceptedDataDownload(ctx, dd, "")
} else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a datadownload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr)
} else if dd.Status.AcceptedTimestamp != nil {
if time.Since(dd.Status.AcceptedTimestamp.Time) >= r.preparingTimeout {
Expand Down Expand Up @@ -737,6 +724,42 @@ func (r *DataDownloadReconciler) closeDataPath(ctx context.Context, ddName strin
r.dataPathMgr.RemoveAsyncBR(ddName)
}

func (r *DataDownloadReconciler) setupExposeParam(dd *velerov2alpha1api.DataDownload) (exposer.GenericRestoreExposeParam, error) {
log := r.logger.WithField("datadownload", dd.Name)

nodeOS := string(dd.Spec.NodeOS)
if nodeOS == "" {
log.Info("nodeOS is empty in DD, fallback to linux")
nodeOS = kube.NodeOSLinux
}

if err := kube.HasNodeWithOS(context.Background(), nodeOS, r.kubeClient.CoreV1()); err != nil {
return exposer.GenericRestoreExposeParam{}, errors.Wrapf(err, "no appropriate node to run datadownload %s/%s", dd.Namespace, dd.Name)
}

hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, dd.Namespace, k, nodeOS); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
} else {
hostingPodLabels[k] = v
}
}

return exposer.GenericRestoreExposeParam{
TargetPVCName: dd.Spec.TargetVolume.PVC,
TargetNamespace: dd.Spec.TargetVolume.Namespace,
HostingPodLabels: hostingPodLabels,
Resources: r.podResources,
OperationTimeout: dd.Spec.OperationTimeout.Duration,
ExposeTimeout: r.preparingTimeout,
NodeOS: nodeOS,
RestorePVCConfig: r.restorePVCConfig,
}, nil
}

func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectReference {
return v1.ObjectReference{
Kind: dd.Kind,
Expand Down
17 changes: 13 additions & 4 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/nodeagent"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/kube"

exposermockes "github.com/vmware-tanzu/velero/pkg/exposer/mocks"
)
Expand All @@ -67,7 +68,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder {
PV: "test-pv",
PVC: "test-pvc",
Namespace: "test-ns",
})
}).NodeOS(velerov2alpha1api.NodeOS("linux"))
}

func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) {
Expand Down Expand Up @@ -167,6 +168,8 @@ func TestDataDownloadReconcile(t *testing.T) {
},
}

node := builder.ForNode("fake-node").Labels(map[string]string{kube.NodeOSLabel: kube.NodeOSLinux}).Result()

tests := []struct {
name string
dd *velerov2alpha1api.DataDownload
Expand Down Expand Up @@ -326,9 +329,15 @@ func TestDataDownloadReconcile(t *testing.T) {
},
{
name: "Restore is exposed",
dd: dataDownloadBuilder().Result(),
dd: dataDownloadBuilder().NodeOS(velerov2alpha1api.NodeOSLinux).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
},
{
name: "Expeced node doesn't exist",

Check failure on line 336 in pkg/controller/data_download_controller_test.go

View workflow job for this annotation

GitHub Actions / Run Codespell

Expeced ==> Expected

Check failure on line 336 in pkg/controller/data_download_controller_test.go

View workflow job for this annotation

GitHub Actions / Run Linter Check

`Expeced` is a misspelling of `Expected` (misspell)
dd: dataDownloadBuilder().NodeOS(velerov2alpha1api.NodeOSWindows).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expectedStatusMsg: "no appropriate node to run datadownload",
},
{
name: "Get empty restore exposer",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
Expand Down Expand Up @@ -388,9 +397,9 @@ func TestDataDownloadReconcile(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var objs []runtime.Object
objs := []runtime.Object{daemonSet, node}
if test.targetPVC != nil {
objs = []runtime.Object{test.targetPVC, daemonSet}
objs = append(objs, test.targetPVC)
}
r, err := initDataDownloadReconciler(objs, test.needErrs...)
require.NoError(t, err)
Expand Down
19 changes: 17 additions & 2 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,17 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

var nodeOS velerov2alpha1api.NodeOS
if res.ByPod.NodeOS == nil {
nodeOS = velerov2alpha1api.NodeOSAuto
} else if *res.ByPod.NodeOS == "linux" {
nodeOS = velerov2alpha1api.NodeOSLinux
} else if *res.ByPod.NodeOS == "windows" {
nodeOS = velerov2alpha1api.NodeOSWindows
} else {
return r.errorOut(ctx, du, errors.Errorf("invalid node OS %s", *res.ByPod.NodeOS), "invalid expose result", log)
}

log.Info("Exposed snapshot is ready and creating data path routine")

// Need to first create file system BR and get data path instance then update data upload status
Expand Down Expand Up @@ -317,6 +328,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
du.Status.NodeOS = nodeOS
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name)

Expand Down Expand Up @@ -792,6 +804,8 @@ func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string)
}

func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (interface{}, error) {
log := r.logger.WithField("dataupload", du.Name)

if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
pvc := &corev1.PersistentVolumeClaim{}
err := r.client.Get(context.Background(), types.NamespacedName{
Expand All @@ -803,7 +817,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}

nodeOS, err := kube.GetPVCAttachingNodeOS(pvc, r.kubeClient.CoreV1(), r.kubeClient.StorageV1(), r.logger)
nodeOS, err := kube.GetPVCAttachingNodeOS(pvc, r.kubeClient.CoreV1(), r.kubeClient.StorageV1(), log)
if err != nil {
return nil, errors.Wrapf(err, "failed to get attaching node OS for PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}
Expand All @@ -821,7 +835,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, du.Namespace, k, nodeOS); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
r.logger.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
} else {
hostingPodLabels[k] = v
Expand All @@ -843,6 +857,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
NodeOS: nodeOS,
}, nil
}

return nil, nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/exposer/csi_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,16 @@ func (e *csiSnapshotExposer) GetExposed(ctx context.Context, ownerObject corev1.

curLog.WithField("pod", pod.Name).Infof("Backup volume is found in pod at index %v", i)

var nodeOS *string
if os, found := pod.Spec.NodeSelector[kube.NodeOSLabel]; found {
nodeOS = &os
}

return &ExposeResult{ByPod: ExposeByPod{
HostingPod: pod,
HostingContainer: containerName,
VolumeName: volumeName,
NodeOS: nodeOS,
}}, nil
}

Expand Down
Loading

0 comments on commit 9651df2

Please sign in to comment.