Skip to content

Commit

Permalink
Node agent restart enhancement
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Qiu <mqiu@vmware.com>
  • Loading branch information
qiuming-best committed Nov 28, 2023
1 parent e58a780 commit 17e3354
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 56 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7130-qiuming-best
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Node agent restart enhancement
55 changes: 9 additions & 46 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,13 @@ func (s *nodeAgentServer) run() {
}

dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataUploadsCancel(dataUploadReconciler)
s.attemptDataUploadResume(dataUploadReconciler)

Check warning on line 288 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L288

Added line #L288 was not covered by tests
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataDownloadsCancel(dataDownloadReconciler)
s.attemptDataDownloadResume(dataDownloadReconciler)

Check warning on line 294 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L294

Added line #L294 was not covered by tests
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}
Expand Down Expand Up @@ -365,65 +365,28 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
s.markInProgressPVRsFailed(client)
}

func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconciler) {
func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) {

Check warning on line 368 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L368

Added line #L368 was not covered by tests
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if dataUploads, err := r.FindDataUploads(s.ctx, client, s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to find data uploads")
} else {
for i := range dataUploads {
du := dataUploads[i]
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
err = controller.UpdateDataUploadWithRetry(s.ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, s.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
})

if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}
if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")

Check warning on line 376 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L375-L376

Added lines #L375 - L376 were not covered by tests
}
}

func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReconciler) {
func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) {

Check warning on line 380 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L380

Added line #L380 was not covered by tests
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if dataDownloads, err := r.FindDataDownloads(s.ctx, client, s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
} else {
for i := range dataDownloads {
dd := dataDownloads[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
err = controller.UpdateDataDownloadWithRetry(s.ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, s.logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
continue
}
s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}

if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")

Check warning on line 389 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L388-L389

Added lines #L388 - L389 were not covered by tests
}
}

Expand Down
82 changes: 78 additions & 4 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
}); err != nil {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return ctrl.Result{}, err
Expand Down Expand Up @@ -192,7 +192,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
}
}

log.Info("Restore is exposed")

// we need to get CR again for it may canceled by datadownload controller on other
Expand All @@ -205,7 +204,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
return ctrl.Result{}, errors.Wrap(err, "getting datadownload")
}

// we need to clean up resources as resources created in Expose it may later than cancel action or prepare time
// and need to clean up resources again
if isDataDownloadInFinalState(dd) {
Expand Down Expand Up @@ -267,7 +265,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
}

// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
Expand Down Expand Up @@ -576,6 +573,51 @@ func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli clie
return dataDownloads, nil
}

func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLable(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) {
dataDownloads := &velerov2alpha1api.DataDownloadList{}
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
return nil, errors.Wrapf(err, "failed to list datauploads")
}

Check warning on line 581 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L579-L581

Added lines #L579 - L581 were not covered by tests

var result []velerov2alpha1api.DataDownload
for _, dd := range dataDownloads.Items {
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
continue
}
if dd.Labels[acceptNodeLabelKey] == r.nodeName {
result = append(result, dd)
}

Check warning on line 590 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L589-L590

Added lines #L589 - L590 were not covered by tests
}
return result, nil
}

// CancelAcceptedDataDownload will cancel the accepted data download
func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string) {
r.logger.Infof("Canceling accepted data for node %s", r.nodeName)
dataDownloads, err := r.findAcceptDataDownloadsByNodeLable(ctx, cli, ns)
if err != nil {
r.logger.WithError(err).Error("failed to find data downloads")
return
}

Check warning on line 602 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L600-L602

Added lines #L600 - L602 were not covered by tests

for _, dd := range dataDownloads {
if dd.Spec.Cancel {
continue

Check warning on line 606 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L605-L606

Added lines #L605 - L606 were not covered by tests
}
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

Check warning on line 612 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L608-L612

Added lines #L608 - L612 were not covered by tests

r.logger.Warn(dd.Status.Message)
if err != nil {
r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error())
}

Check warning on line 617 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L614-L617

Added lines #L614 - L617 were not covered by tests
}
}

func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
ssb.Status.Node = r.nodeName
Expand Down Expand Up @@ -749,3 +791,35 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name
return true, nil
})
}

func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil {
return errors.Wrapf(err, "failed to find data downloads")
} else {
for i := range dataDownloads {
dd := dataDownloads[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by datadownload controller after node-agent restart
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

Check warning on line 810 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L806-L810

Added lines #L806 - L810 were not covered by tests

if err != nil {
logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName())
continue

Check warning on line 814 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L812-L814

Added lines #L812 - L814 were not covered by tests
}
logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled")

Check warning on line 816 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L816

Added line #L816 was not covered by tests
}
}
}

//If the data download is in Accepted status, the expoded PVC may be not created
// so we need to mark the data download as canceled for it may not be recoverable
r.CancelAcceptedDataDownload(ctx, cli, ns)
return nil
}
114 changes: 113 additions & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder {
}

func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) {
var errs []error = make([]error, 5)
var errs []error = make([]error, 6)
for k, isError := range needError {
if k == 0 && isError {
errs[0] = fmt.Errorf("Get error")
Expand All @@ -81,6 +81,8 @@ func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*D
errs[3] = fmt.Errorf("Patch error")
} else if k == 4 && isError {
errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict"))
} else if k == 5 && isError {
errs[5] = fmt.Errorf("List error")
}
}
return initDataDownloadReconcilerWithError(objects, errs...)
Expand Down Expand Up @@ -116,6 +118,8 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
fakeClient.patchError = needError[3]
} else if k == 4 {
fakeClient.updateConflict = needError[4]
} else if k == 5 {
fakeClient.listError = needError[5]
}
}

Expand Down Expand Up @@ -939,3 +943,111 @@ func TestFindDataDownloads(t *testing.T) {
})
}
}

func TestAttemptDataDownloadResume(t *testing.T) {
tests := []struct {
name string
dataUploads []velerov2alpha1api.DataDownload
du *velerov2alpha1api.DataDownload
pod *corev1.Pod
needErrs []bool
acceptedDataDownloads []string
prepareddDataDownloads []string
cancelledDataDownloads []string
expectedError bool
}{
// Test case 1: Process Accepted DataDownload
{
name: "AcceptedDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
acceptedDataDownloads: []string{dataDownloadName},
expectedError: false,
},
// Test case 2: Cancel an Accepted DataDownload
{
name: "CancelAcceptedDataDownload",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
},
// Test case 3: Process Accepted Prepared DataDownload
{
name: "PreparedDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
prepareddDataDownloads: []string{dataDownloadName},
},
// Test case 4: Process Accepted InProgress DataDownload
{
name: "InProgressDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
prepareddDataDownloads: []string{dataDownloadName},
},
// Test case 5: get resume error
{
name: "ResumeError",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
needErrs: []bool{false, false, false, false, false, true},
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
expectedError: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.TODO()
r, err := initDataDownloadReconciler(nil, test.needErrs...)
r.nodeName = "node-1"
require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{})
if test.pod != nil {
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
}
}()

assert.NoError(t, r.client.Create(ctx, test.du))
if test.pod != nil {
assert.NoError(t, r.client.Create(ctx, test.pod))
}
// Run the test
err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)

if test.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)

// Verify DataDownload marked as Cancelled
for _, duName := range test.cancelledDataDownloads {
dataUpload := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataDownloadPhaseCanceled, dataUpload.Status.Phase)
}
// Verify DataDownload marked as Accepted
for _, duName := range test.acceptedDataDownloads {
dataUpload := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataDownloadPhaseAccepted, dataUpload.Status.Phase)
}
// Verify DataDownload marked as Prepared
for _, duName := range test.prepareddDataDownloads {
dataUpload := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataDownloadPhasePrepared, dataUpload.Status.Phase)
}
}
})
}
}
Loading

0 comments on commit 17e3354

Please sign in to comment.