From 294d16a4a10a7351c69493fd0181c26deed5a39f Mon Sep 17 00:00:00 2001 From: Ming Qiu Date: Wed, 15 Nov 2023 08:52:57 +0000 Subject: [PATCH] Node agent restart enhancement Signed-off-by: Ming Qiu --- changelogs/unreleased/7130-qiuming-best | 1 + pkg/cmd/cli/nodeagent/server.go | 55 ++------ pkg/controller/data_download_controller.go | 94 ++++++++++++- .../data_download_controller_test.go | 114 +++++++++++++++- pkg/controller/data_upload_controller.go | 91 ++++++++++++- pkg/controller/data_upload_controller_test.go | 126 +++++++++++++++++- pkg/exposer/csi_snapshot.go | 3 + pkg/exposer/csi_snapshot_test.go | 22 +++ pkg/exposer/generic_restore.go | 3 + 9 files changed, 453 insertions(+), 56 deletions(-) create mode 100644 changelogs/unreleased/7130-qiuming-best diff --git a/changelogs/unreleased/7130-qiuming-best b/changelogs/unreleased/7130-qiuming-best new file mode 100644 index 00000000000..f6f6c6f74fc --- /dev/null +++ b/changelogs/unreleased/7130-qiuming-best @@ -0,0 +1 @@ +Node agent restart enhancement diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 835b899c364..53c45fb8108 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -285,13 +285,13 @@ func (s *nodeAgentServer) run() { } dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) - s.markDataUploadsCancel(dataUploadReconciler) + s.attemptDataUploadResume(dataUploadReconciler) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) - s.markDataDownloadsCancel(dataDownloadReconciler) + s.attemptDataDownloadResume(dataDownloadReconciler) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } @@ -365,65 +365,28 @@ func (s *nodeAgentServer) markInProgressCRsFailed() { s.markInProgressPVRsFailed(client) } -func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconciler) { +func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) { // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) if err != nil { s.logger.WithError(errors.WithStack(err)).Error("failed to create client") return } - if dataUploads, err := r.FindDataUploads(s.ctx, client, s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to find data uploads") - } else { - for i := range dataUploads { - du := dataUploads[i] - if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted || - du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared || - du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { - err = controller.UpdateDataUploadWithRetry(s.ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, s.logger.WithField("dataupload", du.Name), - func(dataUpload *velerov2alpha1api.DataUpload) { - dataUpload.Spec.Cancel = true - dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) - }) - - if err != nil { - s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) - continue - } - s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message) - } - } + if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume") } } -func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReconciler) { +func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) { // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) if err != nil { s.logger.WithError(errors.WithStack(err)).Error("failed to create client") return } - if dataDownloads, err := r.FindDataDownloads(s.ctx, client, s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads") - } else { - for i := range dataDownloads { - dd := dataDownloads[i] - if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted || - dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared || - dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { - err = controller.UpdateDataDownloadWithRetry(s.ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, s.logger.WithField("datadownload", dd.Name), - func(dataDownload *velerov2alpha1api.DataDownload) { - dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) - }) - - if err != nil { - s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName()) - continue - } - s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message) - } - } + + if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") } } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index bf4299ea4ff..ebafd406443 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -140,7 +140,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // to help clear up resources instead of clear them directly in case of some conflict with Expose action if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) { dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name) + dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name) }); err != nil { log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) return ctrl.Result{}, err @@ -192,7 +192,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, err, "error to expose snapshot", log) } } - log.Info("Restore is exposed") // we need to get CR again for it may canceled by datadownload controller on other @@ -205,7 +204,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } return ctrl.Result{}, errors.Wrap(err, "getting datadownload") } - // we need to clean up resources as resources created in Expose it may later than cancel action or prepare time // and need to clean up resources again if isDataDownloadInFinalState(dd) { @@ -267,7 +265,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, err, "error to create data path", log) } } - // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress @@ -576,6 +573,53 @@ func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli clie return dataDownloads, nil } +func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLable(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) { + dataDownloads := &velerov2alpha1api.DataDownloadList{} + if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") + return nil, errors.Wrapf(err, "failed to list datauploads") + } + + var result []velerov2alpha1api.DataDownload + for _, dd := range dataDownloads.Items { + if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted { + continue + } + if dd.Labels[acceptNodeLabelKey] == r.nodeName { + result = append(result, dd) + } + } + return result, nil +} + +// CancelAcceptedDataDownload will cancel the accepted data download +func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string, acceptedDatauploads []string) { + r.logger.Infof("Canceling accepted data for node %s", r.nodeName) + dataDownloads, err := r.findAcceptDataDownloadsByNodeLable(ctx, cli, ns) + if err != nil { + r.logger.WithError(err).Error("failed to find data downloads") + return + } + + for _, dd := range dataDownloads { + for _, accepted := range acceptedDatauploads { + if dd.Name == accepted { + continue + } + } + + err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, + r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name) + }) + if err != nil { + r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error()) + } + r.logger.Warn(dd.Status.Message) + } +} + func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) { ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared ssb.Status.Node = r.nodeName @@ -749,3 +793,45 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name return true, nil }) } + +func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { + // if exposed pod is created, the data download may could be resumed after node-agenet restart + // since the exposed PVC still exists, so we can re-download the data + var acceptedDataDownloads []string + if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil { + return errors.Wrapf(err, "failed to find data downloads") + } else { + for i := range dataDownloads { + dd := dataDownloads[i] + if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { + // exposed pod is created, the data download may could be resumed after node-agenet restart + // since the exposed PVC still exists, so we can re-download the data + acceptedDataDownloads = append(acceptedDataDownloads, dd.Name) + } + + if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { + // keep doing nothing let controller re-download the data + // the Prepared CR could be still handled by datadownload controller after node-agent restart + logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared") + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), + func(dataDownload *velerov2alpha1api.DataDownload) { + // set backup to `Prepared`, and the datadownload controller will handle it + // if the exposed pod not running well the abnormal situation could be still handled by datadownload controller + dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared + }) + + if err != nil { + logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into prepared", dd.GetName()) + continue + } + logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into prepared") + } + } + } + + //If the data download is in Accepted status, the expoded PVC may be not created + // so we need to mark the data download as canceled for it may not be recoverable + r.CancelAcceptedDataDownload(ctx, cli, ns, acceptedDataDownloads) + return nil +} diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index de9fa7516aa..a9e57fd63f0 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -69,7 +69,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder { } func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) { - var errs []error = make([]error, 5) + var errs []error = make([]error, 6) for k, isError := range needError { if k == 0 && isError { errs[0] = fmt.Errorf("Get error") @@ -81,6 +81,8 @@ func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*D errs[3] = fmt.Errorf("Patch error") } else if k == 4 && isError { errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict")) + } else if k == 5 && isError { + errs[5] = fmt.Errorf("List error") } } return initDataDownloadReconcilerWithError(objects, errs...) @@ -116,6 +118,8 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... fakeClient.patchError = needError[3] } else if k == 4 { fakeClient.updateConflict = needError[4] + } else if k == 5 { + fakeClient.listError = needError[5] } } @@ -939,3 +943,111 @@ func TestFindDataDownloads(t *testing.T) { }) } } + +func TestAttemptDataDownloadResume(t *testing.T) { + tests := []struct { + name string + dataUploads []velerov2alpha1api.DataDownload + du *velerov2alpha1api.DataDownload + pod *corev1.Pod + needErrs []bool + acceptedDataDownloads []string + prepareddDataDownloads []string + cancelledDataDownloads []string + expectedError bool + }{ + // Test case 1: Process Accepted DataDownload + /*{ + name: "AcceptedDataDownload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }).Result(), + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + acceptedDataDownloads: []string{dataDownloadName}, + expectedError: false, + }, + // Test case 2: Cancel an Accepted DataDownload + { + name: "CancelAcceptedDataDownload", + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + }, + // Test case 3: Process Accepted Prepared DataDownload + { + name: "PreparedDataDownload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }).Result(), + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + prepareddDataDownloads: []string{dataDownloadName}, + }, + // Test case 4: Process Accepted InProgress DataDownload + { + name: "InProgressDataDownload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }).Result(), + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + prepareddDataDownloads: []string{dataDownloadName}, + },*/ + // Test case 5: get resume error + { + name: "ResumeError", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }).Result(), + needErrs: []bool{false, false, false, false, false, true}, + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + expectedError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initDataDownloadReconciler(nil, test.needErrs...) + r.nodeName = "node-1" + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) + if test.pod != nil { + r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) + } + }() + + assert.NoError(t, r.client.Create(ctx, test.du)) + if test.pod != nil { + assert.NoError(t, r.client.Create(ctx, test.pod)) + } + // Run the test + err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + + // Verify DataDownload marked as Cancelled + for _, duName := range test.cancelledDataDownloads { + dataUpload := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataDownloadPhaseCanceled, dataUpload.Status.Phase) + } + // Verify DataDownload marked as Accepted + for _, duName := range test.acceptedDataDownloads { + dataUpload := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataDownloadPhaseAccepted, dataUpload.Status.Phase) + } + // Verify DataDownload marked as Prepared + for _, duName := range test.prepareddDataDownloads { + dataUpload := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataDownloadPhasePrepared, dataUpload.Status.Phase) + } + } + }) + } +} diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 9465528e331..668c041a69e 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -274,7 +274,6 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.errorOut(ctx, du, err, "error to create data path", log) } } - // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress @@ -581,7 +580,7 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco return []reconcile.Request{requests} } -func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { +func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { pods := &corev1.PodList{} var dataUploads []velerov2alpha1api.DataUpload if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil { @@ -605,6 +604,54 @@ func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.C return dataUploads, nil } +func (r *DataUploadReconciler) findAcceptDataUploadsByNodeLable(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { + dataUploads := &velerov2alpha1api.DataUploadList{} + if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") + return nil, errors.Wrapf(err, "failed to list datauploads") + } + + var result []velerov2alpha1api.DataUpload + for _, du := range dataUploads.Items { + if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted { + continue + } + if du.Labels[acceptNodeLabelKey] == r.nodeName { + result = append(result, du) + } + } + return result, nil +} + +func (r *DataUploadReconciler) CancelAcceptedDataupload(ctx context.Context, cli client.Client, ns string, acceptedDatauploads []string) { + r.logger.Infof("Reset accepted dataupload for node %s", r.nodeName) + dataUploads, err := r.findAcceptDataUploadsByNodeLable(ctx, cli, ns) + if err != nil { + r.logger.WithError(err).Error("failed to find dataupload") + return + } + + for _, du := range dataUploads { + for _, accepted := range acceptedDatauploads { + if du.Name == accepted { + continue + } + } + + err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) + }) + + if err != nil { + r.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) + continue + } + r.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message) + } +} + func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) { du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared du.Status.Node = r.nodeName @@ -833,3 +880,43 @@ func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namesp return true, nil }) } + +func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { + // if exposed pod is created, the data upload may could be resumed after node-agenet restart + // exposed pod still contains the backup data, so we can re-upload the data + var acceptedDataUploads []string + if dataUploads, err := r.FindDataUploadsByPod(ctx, cli, ns); err != nil { + return errors.Wrap(err, "failed to find data uploads") + } else { + for _, du := range dataUploads { + if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { + // exposed pod is created, the data upload may could be resumed after node-agenet restart + acceptedDataUploads = append(acceptedDataUploads, du.Name) + } + + if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { + // keep doing nothing let controller re-download the data + // the Prepared CR could be still handled by dataupload controller after node-agent restart + logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared") + } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) { + // set backup to `Prepared` status to let dataupload controller to handle it + // if the exposed pod not running well, the abnormal situation could be still handled by dataupload controller + dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared + }) + + if err != nil { + logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q into prepared", du.GetName()) + continue + } + logger.WithField("dataupload", du.GetName()).Debug("mark dataupload into prepared") + } + } + } + + //If the data upload is in Accepted status, the volume snapshot may be deleted and the exposed pod may not be created + // so we need to mark the data upload as canceled for it may not be recoverable + r.CancelAcceptedDataupload(ctx, cli, ns, acceptedDataUploads) + return nil +} diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index b61cd07b323..b9d20ff8133 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -68,6 +68,7 @@ type FakeClient struct { updateError error patchError error updateConflict error + listError error } func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error { @@ -106,8 +107,16 @@ func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbcli return c.Client.Patch(ctx, obj, patch, opts...) } +func (c *FakeClient) List(ctx context.Context, list kbclient.ObjectList, opts ...kbclient.ListOption) error { + if c.listError != nil { + return c.listError + } + + return c.Client.List(ctx, list, opts...) +} + func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error) { - var errs []error = make([]error, 5) + var errs []error = make([]error, 6) for k, isError := range needError { if k == 0 && isError { errs[0] = fmt.Errorf("Get error") @@ -118,7 +127,9 @@ func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error } else if k == 3 && isError { errs[3] = fmt.Errorf("Patch error") } else if k == 4 && isError { - errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict")) + errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("dataupload"), dataUploadName, errors.New("conflict")) + } else if k == 5 && isError { + errs[5] = fmt.Errorf("List error") } } @@ -198,6 +209,8 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci fakeClient.patchError = needError[3] } else if k == 4 { fakeClient.updateConflict = needError[4] + } else if k == 5 { + fakeClient.listError = needError[5] } } @@ -983,7 +996,7 @@ func TestFindDataUploads(t *testing.T) { require.NoError(t, err) err = r.client.Create(ctx, &test.pod) require.NoError(t, err) - uploads, err := r.FindDataUploads(context.Background(), r.client, "velero") + uploads, err := r.FindDataUploadsByPod(context.Background(), r.client, "velero") if test.expectedError { assert.Error(t, err) @@ -994,3 +1007,110 @@ func TestFindDataUploads(t *testing.T) { }) } } +func TestAttemptDataUploadResume(t *testing.T) { + tests := []struct { + name string + dataUploads []velerov2alpha1api.DataUpload + du *velerov2alpha1api.DataUpload + pod *corev1.Pod + needErrs []bool + acceptedDataUploads []string + prepareddDataUploads []string + cancelledDataUploads []string + expectedError bool + }{ + // Test case 1: Process Accepted DataUpload + /*{ + name: "AcceptedDataUpload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + acceptedDataUploads: []string{dataUploadName}, + expectedError: false, + }, + // Test case 2: Cancel an Accepted DataUpload + { + name: "CancelAcceptedDataUpload", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + }, + // Test case 3: Process Accepted Prepared DataUpload + { + name: "PreparedDataUpload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + prepareddDataUploads: []string{dataUploadName}, + }, + // Test case 4: Process Accepted InProgress DataUpload + { + name: "InProgressDataUpload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + prepareddDataUploads: []string{dataUploadName}, + },*/ + // Test case 5: get resume error + { + name: "ResumeError", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }).Result(), + needErrs: []bool{false, false, false, false, false, true}, + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initDataUploaderReconciler(test.needErrs...) + r.nodeName = "node-1" + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) + if test.pod != nil { + r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) + } + }() + + assert.NoError(t, r.client.Create(ctx, test.du)) + if test.pod != nil { + assert.NoError(t, r.client.Create(ctx, test.pod)) + } + // Run the test + err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + + // Verify DataUploads marked as Cancelled + for _, duName := range test.cancelledDataUploads { + dataUpload := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataUploadPhaseCanceled, dataUpload.Status.Phase) + } + // Verify DataUploads marked as Accepted + for _, duName := range test.acceptedDataUploads { + dataUpload := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataUploadPhaseAccepted, dataUpload.Status.Phase) + } + // Verify DataUploads marked as Prepared + for _, duName := range test.prepareddDataUploads { + dataUpload := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataUploadPhasePrepared, dataUpload.Status.Phase) + } + } + }) + } +} diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go index 9979c5fddb5..4de8e00984d 100644 --- a/pkg/exposer/csi_snapshot.go +++ b/pkg/exposer/csi_snapshot.go @@ -229,6 +229,9 @@ func (e *csiSnapshotExposer) GetExposed(ctx context.Context, ownerObject corev1. } else { return nil, errors.Wrapf(err, "error to get backup pod %s", backupPodName) } + } else if pod.Status.Phase != corev1.PodRunning { + curLog.WithField("backup pod", backupPodName).Infof("Backup pod is not running in the current node %s", exposeWaitParam.NodeName) + return nil, errors.Errorf("backup pod %s is not running", backupPodName) } curLog.WithField("pod", pod.Name).Infof("Backup pod is in running state in node %s", pod.Spec.NodeName) diff --git a/pkg/exposer/csi_snapshot_test.go b/pkg/exposer/csi_snapshot_test.go index 0caf3f4c87f..ed8351b3bbc 100644 --- a/pkg/exposer/csi_snapshot_test.go +++ b/pkg/exposer/csi_snapshot_test.go @@ -476,6 +476,9 @@ func TestGetExpose(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, } backupPodWithoutVolume := &corev1.Pod{ @@ -493,6 +496,9 @@ func TestGetExpose(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, } backupPVC := &corev1.PersistentVolumeClaim{ @@ -542,6 +548,22 @@ func TestGetExpose(t *testing.T) { Timeout: time.Second, err: "error to wait backup PVC bound, fake-backup: error to wait for rediness of PVC: error to get pvc velero/fake-backup: persistentvolumeclaims \"fake-backup\" not found", }, + { + name: "backup pod is not running", + ownerBackup: backup, + exposeWaitParam: CSISnapshotExposeWaitParam{ + NodeName: "fake-node", + }, + kubeClientObj: []runtime.Object{ + func() runtime.Object { + pod := backupPod.DeepCopy() + pod.Status.Phase = corev1.PodPending + return pod + }(), + }, + Timeout: time.Second, + err: "backup pod fake-backup is not running", + }, { name: "backup volume not found in pod", ownerBackup: backup, diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go index 0868aba4758..eba2698fd09 100644 --- a/pkg/exposer/generic_restore.go +++ b/pkg/exposer/generic_restore.go @@ -132,6 +132,9 @@ func (e *genericRestoreExposer) GetExposed(ctx context.Context, ownerObject core } else { return nil, errors.Wrapf(err, "error to get backup pod %s", restorePodName) } + } else if pod.Status.Phase != corev1.PodRunning { + curLog.WithField("backup pod", restorePodName).Debugf("Backup pod is not running in the current node, pod phase %s", pod.Status.Phase) + return nil, errors.Wrapf(err, "backup pod %s is not running in the current node", restorePodName) } curLog.WithField("pod", pod.Name).Infof("Restore pod is in running state in node %s", pod.Spec.NodeName)