Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node agent restart enhancement #7130

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/7130-qiuming-best
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Node agent restart enhancement
55 changes: 9 additions & 46 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,13 @@
}

dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataUploadsCancel(dataUploadReconciler)
s.attemptDataUploadResume(dataUploadReconciler)

Check warning on line 288 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L288

Added line #L288 was not covered by tests
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataDownloadsCancel(dataDownloadReconciler)
s.attemptDataDownloadResume(dataDownloadReconciler)

Check warning on line 294 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L294

Added line #L294 was not covered by tests
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}
Expand Down Expand Up @@ -365,65 +365,28 @@
s.markInProgressPVRsFailed(client)
}

func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconciler) {
func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) {

Check warning on line 368 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L368

Added line #L368 was not covered by tests
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if dataUploads, err := r.FindDataUploads(s.ctx, client, s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to find data uploads")
} else {
for i := range dataUploads {
du := dataUploads[i]
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
err = controller.UpdateDataUploadWithRetry(s.ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, s.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
})

if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
}
}
if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")

Check warning on line 376 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L375-L376

Added lines #L375 - L376 were not covered by tests
}
}

func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReconciler) {
func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) {

Check warning on line 380 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L380

Added line #L380 was not covered by tests
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if dataDownloads, err := r.FindDataDownloads(s.ctx, client, s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
} else {
for i := range dataDownloads {
dd := dataDownloads[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
err = controller.UpdateDataDownloadWithRetry(s.ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, s.logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
continue
}
s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
}
}

if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")

Check warning on line 389 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L388-L389

Added lines #L388 - L389 were not covered by tests
}
}

Expand Down
82 changes: 78 additions & 4 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
}); err != nil {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return ctrl.Result{}, err
Expand Down Expand Up @@ -192,7 +192,6 @@
return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
}
}

log.Info("Restore is exposed")

// we need to get CR again for it may canceled by datadownload controller on other
Expand All @@ -205,7 +204,6 @@
}
return ctrl.Result{}, errors.Wrap(err, "getting datadownload")
}

// we need to clean up resources as resources created in Expose it may later than cancel action or prepare time
// and need to clean up resources again
if isDataDownloadInFinalState(dd) {
Expand Down Expand Up @@ -267,7 +265,6 @@
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
}

// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
Expand Down Expand Up @@ -576,6 +573,51 @@
return dataDownloads, nil
}

func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) {
dataDownloads := &velerov2alpha1api.DataDownloadList{}
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
return nil, errors.Wrapf(err, "failed to list datauploads")
}

Check warning on line 581 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L579-L581

Added lines #L579 - L581 were not covered by tests

var result []velerov2alpha1api.DataDownload
for _, dd := range dataDownloads.Items {
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
continue
}
if dd.Labels[acceptNodeLabelKey] == r.nodeName {
result = append(result, dd)
}

Check warning on line 590 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L589-L590

Added lines #L589 - L590 were not covered by tests
}
return result, nil
}

// CancelAcceptedDataDownload will cancel the accepted data download
func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string) {
r.logger.Infof("Canceling accepted data for node %s", r.nodeName)
dataDownloads, err := r.findAcceptDataDownloadsByNodeLabel(ctx, cli, ns)
if err != nil {
r.logger.WithError(err).Error("failed to find data downloads")
return
}

Check warning on line 602 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L600-L602

Added lines #L600 - L602 were not covered by tests

for _, dd := range dataDownloads {
if dd.Spec.Cancel {
continue

Check warning on line 606 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L605-L606

Added lines #L605 - L606 were not covered by tests
}
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

Check warning on line 612 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L608-L612

Added lines #L608 - L612 were not covered by tests

r.logger.Warn(dd.Status.Message)
if err != nil {
r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error())
}

Check warning on line 617 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L614-L617

Added lines #L614 - L617 were not covered by tests
}
}

func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
ssb.Status.Node = r.nodeName
Expand Down Expand Up @@ -749,3 +791,35 @@
return true, nil
})
}

func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil {
return errors.Wrapf(err, "failed to find data downloads")
} else {
for i := range dataDownloads {
dd := dataDownloads[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by datadownload controller after node-agent restart
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

Check warning on line 810 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L806-L810

Added lines #L806 - L810 were not covered by tests

if err != nil {
logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName())
continue

Check warning on line 814 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L812-L814

Added lines #L812 - L814 were not covered by tests
}
logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled")

Check warning on line 816 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L816

Added line #L816 was not covered by tests
}
}
}

//If the data download is in Accepted status, the expoded PVC may be not created
// so we need to mark the data download as canceled for it may not be recoverable
r.CancelAcceptedDataDownload(ctx, cli, ns)
return nil
}
114 changes: 113 additions & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder {
}

func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) {
var errs []error = make([]error, 5)
var errs []error = make([]error, 6)
for k, isError := range needError {
if k == 0 && isError {
errs[0] = fmt.Errorf("Get error")
Expand All @@ -81,6 +81,8 @@ func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*D
errs[3] = fmt.Errorf("Patch error")
} else if k == 4 && isError {
errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict"))
} else if k == 5 && isError {
errs[5] = fmt.Errorf("List error")
}
}
return initDataDownloadReconcilerWithError(objects, errs...)
Expand Down Expand Up @@ -116,6 +118,8 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
fakeClient.patchError = needError[3]
} else if k == 4 {
fakeClient.updateConflict = needError[4]
} else if k == 5 {
fakeClient.listError = needError[5]
}
}

Expand Down Expand Up @@ -939,3 +943,111 @@ func TestFindDataDownloads(t *testing.T) {
})
}
}

func TestAttemptDataDownloadResume(t *testing.T) {
tests := []struct {
name string
dataUploads []velerov2alpha1api.DataDownload
du *velerov2alpha1api.DataDownload
pod *corev1.Pod
needErrs []bool
acceptedDataDownloads []string
prepareddDataDownloads []string
cancelledDataDownloads []string
expectedError bool
}{
// Test case 1: Process Accepted DataDownload
{
name: "AcceptedDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
acceptedDataDownloads: []string{dataDownloadName},
expectedError: false,
},
// Test case 2: Cancel an Accepted DataDownload
{
name: "CancelAcceptedDataDownload",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
},
// Test case 3: Process Accepted Prepared DataDownload
{
name: "PreparedDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
prepareddDataDownloads: []string{dataDownloadName},
},
// Test case 4: Process Accepted InProgress DataDownload
{
name: "InProgressDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
prepareddDataDownloads: []string{dataDownloadName},
},
// Test case 5: get resume error
{
name: "ResumeError",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
needErrs: []bool{false, false, false, false, false, true},
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
expectedError: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.TODO()
r, err := initDataDownloadReconciler(nil, test.needErrs...)
r.nodeName = "node-1"
require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{})
if test.pod != nil {
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
}
}()

assert.NoError(t, r.client.Create(ctx, test.du))
if test.pod != nil {
assert.NoError(t, r.client.Create(ctx, test.pod))
}
// Run the test
err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)

if test.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)

// Verify DataDownload marked as Cancelled
for _, duName := range test.cancelledDataDownloads {
dataUpload := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataDownloadPhaseCanceled, dataUpload.Status.Phase)
}
// Verify DataDownload marked as Accepted
for _, duName := range test.acceptedDataDownloads {
dataUpload := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataDownloadPhaseAccepted, dataUpload.Status.Phase)
}
// Verify DataDownload marked as Prepared
for _, duName := range test.prepareddDataDownloads {
dataUpload := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataDownloadPhasePrepared, dataUpload.Status.Phase)
}
}
})
}
}
Loading
Loading