diff --git a/pkg/cmd/cli/repomantenance/maintenance.go b/pkg/cmd/cli/repomantenance/maintenance.go index e467c054a9..80721a4369 100644 --- a/pkg/cmd/cli/repomantenance/maintenance.go +++ b/pkg/cmd/cli/repomantenance/maintenance.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -12,21 +13,25 @@ import ( "github.com/spf13/pflag" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerocli "github.com/vmware-tanzu/velero/pkg/client" "github.com/vmware-tanzu/velero/pkg/repository" - "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + + repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" + repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager" ) type Options struct { RepoName string BackupStorageLocation string RepoType string + ResourceTimeout time.Duration LogLevelFlag *logging.LevelFlag FormatFlag *logging.FormatFlag } @@ -83,39 +88,46 @@ func (o *Options) Run(f velerocli.Factory) { } } -func (o *Options) initClient(f velerocli.Factory) (client.Client, error) { +func (o *Options) initClient(f velerocli.Factory) (client.Client, kubernetes.Interface, error) { scheme := runtime.NewScheme() err := velerov1api.AddToScheme(scheme) if err != nil { - return nil, errors.Wrap(err, "failed to add velero scheme") + return nil, nil, errors.Wrap(err, "failed to add velero scheme") } err = v1.AddToScheme(scheme) if err != nil { - return nil, errors.Wrap(err, "failed to add api core scheme") + return nil, nil, errors.Wrap(err, "failed to add api core scheme") } config, err := f.ClientConfig() if err != nil { - return nil, errors.Wrap(err, "failed to get client config") + return nil, nil, errors.Wrap(err, "failed to get client config") } cli, err := client.New(config, client.Options{ Scheme: scheme, }) if err != nil { - return nil, errors.Wrap(err, "failed to create client") + return nil, nil, errors.Wrap(err, "failed to create client") + } + + kubeClient, err := f.KubeClient() + if err != nil { + return nil, nil, errors.Wrap(err, "failed to create kube client") } - return cli, nil + return cli, kubeClient, nil } -func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error { - cli, err := o.initClient(f) - if err != nil { - return err +func initRepoManager(namespace string, kubeClient kubernetes.Interface, cli client.Client, logger logrus.FieldLogger) (repomanager.Manager, error) { + // ensure the repo key secret is set up + if err := repokey.EnsureCommonRepositoryKey(kubeClient.CoreV1(), namespace); err != nil { + return nil, errors.Wrap(err, "failed to ensure repository key") } + repoLocker := repository.NewRepoLocker() + credentialFileStore, err := credentials.NewNamespacedFileStore( cli, namespace, @@ -123,23 +135,33 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log filesystem.NewFileSystem(), ) if err != nil { - return errors.Wrap(err, "failed to create namespaced file store") + return nil, errors.Wrap(err, "failed to create namespaced file store") } credentialSecretStore, err := credentials.NewNamespacedSecretStore(cli, namespace) if err != nil { - return errors.Wrap(err, "failed to create namespaced secret store") + return nil, errors.Wrap(err, "failed to create namespaced secret store") } - var repoProvider provider.Provider - if o.RepoType == velerov1api.BackupRepositoryTypeRestic { - repoProvider = provider.NewResticRepositoryProvider(credentialFileStore, filesystem.NewFileSystem(), logger) - } else { - repoProvider = provider.NewUnifiedRepoProvider( - credentials.CredentialGetter{ - FromFile: credentialFileStore, - FromSecret: credentialSecretStore, - }, o.RepoType, logger) + return repomanager.NewManager( + namespace, + cli, + repoLocker, + credentialFileStore, + credentialSecretStore, + logger, + ), nil +} + +func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error { + cli, kubeClient, err := o.initClient(f) + if err != nil { + return err + } + + manager, err := initRepoManager(namespace, kubeClient, cli, logger) + if err != nil { + return err } // backupRepository @@ -149,31 +171,14 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log BackupLocation: o.BackupStorageLocation, RepositoryType: o.RepoType, }, true) - if err != nil { return errors.Wrap(err, "failed to get backup repository") } - // bsl - bsl := &velerov1api.BackupStorageLocation{} - err = cli.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: repo.Spec.BackupStorageLocation}, bsl) - if err != nil { - return errors.Wrap(err, "failed to get backup storage location") - } - - para := provider.RepoParam{ - BackupRepo: repo, - BackupLocation: bsl, - } - - err = repoProvider.BoostRepoConnect(context.Background(), para) - if err != nil { - return errors.Wrap(err, "failed to boost repo connect") - } - - err = repoProvider.PruneRepo(context.Background(), para) + err = manager.PruneRepo(repo) if err != nil { return errors.Wrap(err, "failed to prune repo") } + return nil } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index a0828e69ef..dd616ad65a 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -491,15 +491,9 @@ func (s *server) initRepoManager() error { s.namespace, s.mgr.GetClient(), s.repoLocker, - s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, - s.config.RepoMaintenanceJobConfig, - s.config.PodResources, - s.config.KeepLatestMaintenanceJobs, s.logger, - s.logLevel, - s.config.LogFormat, ) return nil @@ -720,9 +714,14 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.namespace, s.logger, s.mgr.GetClient(), + s.repoManager, s.config.RepoMaintenanceFrequency, s.config.BackupRepoConfig, - s.repoManager, + s.config.KeepLatestMaintenanceJobs, + s.config.RepoMaintenanceJobConfig, + s.config.PodResources, + s.logLevel, + s.config.LogFormat, ).SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerBackupRepo) } diff --git a/pkg/controller/backup_repository_controller.go b/pkg/controller/backup_repository_controller.go index 535fc41b6b..52d3cf04c6 100644 --- a/pkg/controller/backup_repository_controller.go +++ b/pkg/controller/backup_repository_controller.go @@ -45,6 +45,7 @@ import ( repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config" repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager" "github.com/vmware-tanzu/velero/pkg/util/kube" + "github.com/vmware-tanzu/velero/pkg/util/logging" ) const ( @@ -55,16 +56,22 @@ const ( type BackupRepoReconciler struct { client.Client - namespace string - logger logrus.FieldLogger - clock clocks.WithTickerAndDelayedExecution - maintenanceFrequency time.Duration - backupRepoConfig string - repositoryManager repomanager.Manager + namespace string + logger logrus.FieldLogger + clock clocks.WithTickerAndDelayedExecution + maintenanceFrequency time.Duration + backupRepoConfig string + repositoryManager repomanager.Manager + keepLatestMaintenanceJobs int + repoMaintenanceConfig string + podResources kube.PodResources + logLevel logrus.Level + logFormat *logging.FormatFlag } -func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client, - maintenanceFrequency time.Duration, backupRepoConfig string, repositoryManager repomanager.Manager) *BackupRepoReconciler { +func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client, repositoryManager repomanager.Manager, + maintenanceFrequency time.Duration, backupRepoConfig string, keepLatestMaintenanceJobs int, repoMaintenanceConfig string, podResources kube.PodResources, + logLevel logrus.Level, logFormat *logging.FormatFlag) *BackupRepoReconciler { c := &BackupRepoReconciler{ client, namespace, @@ -73,6 +80,11 @@ func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client maintenanceFrequency, backupRepoConfig, repositoryManager, + keepLatestMaintenanceJobs, + repoMaintenanceConfig, + podResources, + logLevel, + logFormat, } return c @@ -212,7 +224,13 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, errors.Wrap(err, "error handling incomplete repo maintenance jobs") } - return ctrl.Result{}, r.runMaintenanceIfDue(ctx, backupRepo, log) + if err := r.runMaintenanceIfDue(ctx, backupRepo, log); err != nil { + return ctrl.Result{}, errors.Wrap(err, "error check and run repo maintenance jobs") + } + + if err := repository.DeleteOldMaintenanceJobs(r.Client, req.Name, r.keepLatestMaintenanceJobs); err != nil { + log.WithError(err).Warn("Failed to delete old maintenance jobs") + } } return ctrl.Result{}, nil @@ -306,7 +324,7 @@ func ensureRepo(repo *velerov1api.BackupRepository, repoManager repomanager.Mana } func (r *BackupRepoReconciler) recallMaintenance(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { - history, err := repository.WaitIncompleteMaintenance(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log) + history, err := repository.WaitAllMaintenanceJobComplete(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log) if err != nil { return errors.Wrapf(err, "error waiting incomplete repo maintenance job for repo %s", req.Name) } @@ -380,7 +398,11 @@ func getLastMaintenanceTimeFromHistory(history []velerov1api.BackupRepositoryMai time := history[0].CompleteTimestamp for i := range history { - if time.Before(history[i].CompleteTimestamp) { + if history[i].CompleteTimestamp == nil { + continue + } + + if time == nil || time.Before(history[i].CompleteTimestamp) { time = history[i].CompleteTimestamp } } @@ -406,8 +428,13 @@ func isEarlierMaintenanceStatus(a, b velerov1api.BackupRepositoryMaintenanceStat return a.StartTimestamp.Before(b.StartTimestamp) } +var funcStartMaintenanceJob = repository.StartMaintenanceJob +var funcWaitMaintenanceJobComplete = repository.WaitMaintenanceJobComplete + func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { - if !dueForMaintenance(req, r.clock.Now()) { + startTime := r.clock.Now() + + if !dueForMaintenance(req, startTime) { log.Debug("not due for maintenance") return nil } @@ -418,31 +445,39 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel // should not cause the repo to move to `NotReady`. log.Debug("Pruning repo") - // when PruneRepo fails, the maintenance result will be left temporarily + job, err := funcStartMaintenanceJob(r.Client, ctx, req, r.repoMaintenanceConfig, r.podResources, r.logLevel, r.logFormat, log) + if err != nil { + log.WithError(err).Warn("Starting repo maintenance failed") + return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { + updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, &metav1.Time{Time: startTime}, nil, fmt.Sprintf("Failed to start maintenance job, err: %v", err)) + }) + } + + // when WaitMaintenanceJobComplete fails, the maintenance result will be left temporarily // If the maintenenance still completes later, recallMaintenance recalls the left onces and update LastMaintenanceTime and history - status, err := r.repositoryManager.PruneRepo(req) + status, err := funcWaitMaintenanceJobComplete(r.Client, ctx, job, r.namespace, log) if err != nil { - return errors.Wrapf(err, "error pruning repository") + return errors.Wrapf(err, "error waiting repo maintenance completion status") } if status.Result == velerov1api.BackupRepositoryMaintenanceFailed { log.WithError(err).Warn("Pruning repository failed") return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { - updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, status.StartTimestamp.Time, status.CompleteTimestamp.Time, status.Message) + updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, status.StartTimestamp, status.CompleteTimestamp, status.Message) }) } return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { rr.Status.LastMaintenanceTime = &metav1.Time{Time: status.CompleteTimestamp.Time} - updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, status.StartTimestamp.Time, status.CompleteTimestamp.Time, status.Message) + updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, status.StartTimestamp, status.CompleteTimestamp, status.Message) }) } -func updateRepoMaintenanceHistory(repo *velerov1api.BackupRepository, result velerov1api.BackupRepositoryMaintenanceResult, startTime time.Time, completionTime time.Time, message string) { +func updateRepoMaintenanceHistory(repo *velerov1api.BackupRepository, result velerov1api.BackupRepositoryMaintenanceResult, startTime, completionTime *metav1.Time, message string) { latest := velerov1api.BackupRepositoryMaintenanceStatus{ Result: result, - StartTimestamp: &metav1.Time{Time: startTime}, - CompleteTimestamp: &metav1.Time{Time: completionTime}, + StartTimestamp: startTime, + CompleteTimestamp: completionTime, Message: message, } diff --git a/pkg/controller/backup_repository_controller_test.go b/pkg/controller/backup_repository_controller_test.go index 9b5dd4c4ae..b3a358fabc 100644 --- a/pkg/controller/backup_repository_controller_test.go +++ b/pkg/controller/backup_repository_controller_test.go @@ -19,20 +19,30 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/repository" repomokes "github.com/vmware-tanzu/velero/pkg/repository/mocks" repotypes "github.com/vmware-tanzu/velero/pkg/repository/types" velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/kube" + "github.com/vmware-tanzu/velero/pkg/util/logging" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + batchv1 "k8s.io/api/batch/v1" ) const testMaintenanceFrequency = 10 * time.Minute @@ -47,9 +57,14 @@ func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret velerov1api.DefaultNamespace, velerotest.NewLogger(), velerotest.NewFakeControllerRuntimeClient(t), + mgr, testMaintenanceFrequency, "fake-repo-config", - mgr, + 3, + "", + kube.PodResources{}, + logrus.InfoLevel, + nil, ) } @@ -104,24 +119,71 @@ func TestCheckNotReadyRepo(t *testing.T) { assert.Equal(t, "s3:test.amazonaws.com/bucket/restic/volume-ns-1", rr.Spec.ResticIdentifier) } +func startMaintenanceJobFail(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) { + return "", errors.New("fake-start-error") +} + +func startMaintenanceJobSucceed(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) { + return "fake-job-name", nil +} + +func waitMaintenanceJobCompleteFail(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) { + return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.New("fake-wait-error") +} + +func waitMaintenanceJobCompleteSucceed(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) { + return velerov1api.BackupRepositoryMaintenanceStatus{ + StartTimestamp: &metav1.Time{Time: time.Now()}, + CompleteTimestamp: &metav1.Time{Time: time.Now().Add(time.Hour)}, + }, nil +} + func TestRunMaintenanceIfDue(t *testing.T) { rr := mockBackupRepositoryCR() - reconciler := mockBackupRepoReconciler(t, "PruneRepo", rr, velerov1api.BackupRepositoryMaintenanceStatus{ - StartTimestamp: &metav1.Time{}, - CompleteTimestamp: &metav1.Time{}, - }, nil) + reconciler := mockBackupRepoReconciler(t, "", rr, nil) + funcStartMaintenanceJob = startMaintenanceJobFail err := reconciler.Client.Create(context.TODO(), rr) assert.NoError(t, err) lastTm := rr.Status.LastMaintenanceTime + history := rr.Status.RecentMaintenance + err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger) + assert.NoError(t, err) + assert.Equal(t, rr.Status.LastMaintenanceTime, lastTm) + assert.NotEqual(t, rr.Status.RecentMaintenance, history) + + rr = mockBackupRepositoryCR() + reconciler = mockBackupRepoReconciler(t, "", rr, nil) + funcStartMaintenanceJob = startMaintenanceJobSucceed + funcWaitMaintenanceJobComplete = waitMaintenanceJobCompleteFail + err = reconciler.Client.Create(context.TODO(), rr) + assert.NoError(t, err) + lastTm = rr.Status.LastMaintenanceTime + history = rr.Status.RecentMaintenance + err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger) + assert.EqualError(t, err, "error waiting repo maintenance completion status: fake-wait-error") + assert.Equal(t, rr.Status.LastMaintenanceTime, lastTm) + assert.Equal(t, rr.Status.RecentMaintenance, history) + + rr = mockBackupRepositoryCR() + reconciler = mockBackupRepoReconciler(t, "", rr, nil) + funcStartMaintenanceJob = startMaintenanceJobSucceed + funcWaitMaintenanceJobComplete = waitMaintenanceJobCompleteSucceed + err = reconciler.Client.Create(context.TODO(), rr) + assert.NoError(t, err) + lastTm = rr.Status.LastMaintenanceTime + history = rr.Status.RecentMaintenance err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger) assert.NoError(t, err) assert.NotEqual(t, rr.Status.LastMaintenanceTime, lastTm) + assert.NotEqual(t, rr.Status.RecentMaintenance, history) rr.Status.LastMaintenanceTime = &metav1.Time{Time: time.Now()} lastTm = rr.Status.LastMaintenanceTime + history = rr.Status.RecentMaintenance err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger) assert.NoError(t, err) assert.Equal(t, rr.Status.LastMaintenanceTime, lastTm) + assert.Equal(t, rr.Status.RecentMaintenance, history) } func TestInitializeRepo(t *testing.T) { @@ -251,9 +313,14 @@ func TestGetRepositoryMaintenanceFrequency(t *testing.T) { velerov1api.DefaultNamespace, velerotest.NewLogger(), velerotest.NewFakeControllerRuntimeClient(t), + &mgr, test.userDefinedFreq, "", - &mgr, + 3, + "", + kube.PodResources{}, + logrus.InfoLevel, + nil, ) freq := reconciler.getRepositoryMaintenanceFrequency(test.repo) @@ -380,7 +447,14 @@ func TestNeedInvalidBackupRepo(t *testing.T) { velerov1api.DefaultNamespace, velerotest.NewLogger(), velerotest.NewFakeControllerRuntimeClient(t), - time.Duration(0), "", nil) + nil, + time.Duration(0), + "", + 3, + "", + kube.PodResources{}, + logrus.InfoLevel, + nil) need := reconciler.needInvalidBackupRepo(test.oldBSL, test.newBSL) assert.Equal(t, test.expect, need) @@ -656,7 +730,7 @@ func TestUpdateRepoMaintenanceHistory(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - updateRepoMaintenanceHistory(test.backupRepo, test.result, standardTime, standardTime.Add(time.Hour), "fake-message-0") + updateRepoMaintenanceHistory(test.backupRepo, test.result, &metav1.Time{Time: standardTime}, &metav1.Time{Time: standardTime.Add(time.Hour)}, "fake-message-0") for at := range test.backupRepo.Status.RecentMaintenance { assert.Equal(t, test.expectedHistory[at].StartTimestamp.Time, test.backupRepo.Status.RecentMaintenance[at].StartTimestamp.Time) @@ -666,3 +740,109 @@ func TestUpdateRepoMaintenanceHistory(t *testing.T) { }) } } + +func TestRecallMaintenance(t *testing.T) { + now := time.Now() + + schemeFail := runtime.NewScheme() + velerov1api.AddToScheme(schemeFail) + + scheme := runtime.NewScheme() + batchv1.AddToScheme(scheme) + corev1.AddToScheme(scheme) + velerov1api.AddToScheme(scheme) + + jobSucceeded := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: velerov1api.DefaultNamespace, + Labels: map[string]string{repository.RepositoryNameLabel: "repo"}, + CreationTimestamp: metav1.Time{Time: now.Add(time.Hour)}, + }, + Status: batchv1.JobStatus{ + StartTime: &metav1.Time{Time: now.Add(time.Hour)}, + CompletionTime: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Succeeded: 1, + }, + } + + jobPodSucceeded := builder.ForPod(velerov1api.DefaultNamespace, "job1").Labels(map[string]string{"job-name": "job1"}).ContainerStatuses(&v1.ContainerStatus{ + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }).Result() + + tests := []struct { + name string + kubeClientObj []runtime.Object + runtimeScheme *runtime.Scheme + repoLastMatainTime metav1.Time + expectNewHistory bool + expectTimeUpdate bool + expectedErr string + }{ + { + name: "wait completion error", + runtimeScheme: schemeFail, + expectedErr: "error waiting incomplete repo maintenance job for repo repo: error listing maintenance job for repo repo: no kind is registered for the type v1.JobList in scheme \"pkg/runtime/scheme.go:100\"", + }, + { + name: "no consolidate result", + runtimeScheme: scheme, + }, + { + name: "no update last time", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobSucceeded, + jobPodSucceeded, + }, + repoLastMatainTime: metav1.Time{Time: now.Add(time.Hour * 5)}, + expectNewHistory: true, + }, + { + name: "update last time", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobSucceeded, + jobPodSucceeded, + }, + expectNewHistory: true, + expectTimeUpdate: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r := mockBackupRepoReconciler(t, "", nil, nil) + + backupRepo := mockBackupRepositoryCR() + backupRepo.Status.LastMaintenanceTime = &test.repoLastMatainTime + + test.kubeClientObj = append(test.kubeClientObj, backupRepo) + + fakeClientBuilder := fake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(test.runtimeScheme) + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + r.Client = fakeClient + + lastTm := backupRepo.Status.LastMaintenanceTime + history := backupRepo.Status.RecentMaintenance + + err := r.recallMaintenance(context.TODO(), backupRepo, velerotest.NewLogger()) + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + } else { + assert.NoError(t, err) + + if test.expectNewHistory { + assert.NotEqual(t, history, backupRepo.Status.RecentMaintenance) + } + + if test.expectTimeUpdate { + assert.NotEqual(t, lastTm, backupRepo.Status.LastMaintenanceTime) + } + } + }) + } +} diff --git a/pkg/repository/maintenance.go b/pkg/repository/maintenance.go index 5a1ec1f614..0e048d0a76 100644 --- a/pkg/repository/maintenance.go +++ b/pkg/repository/maintenance.go @@ -35,6 +35,12 @@ import ( velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/util/kube" + + appsv1 "k8s.io/api/apps/v1" + + veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero" + + "github.com/vmware-tanzu/velero/pkg/util/logging" ) const ( @@ -86,29 +92,34 @@ func DeleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error { return nil } -// WaitForJobComplete wait for completion of the specified job and update the latest job object -func WaitForJobComplete(ctx context.Context, client client.Client, ns string, job string) (*batchv1.Job, error) { - updated := &batchv1.Job{} +// waitForJobComplete wait for completion of the specified job and update the latest job object +func waitForJobComplete(ctx context.Context, client client.Client, ns string, job string) (*batchv1.Job, error) { + var ret *batchv1.Job + err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { + updated := &batchv1.Job{} err := client.Get(ctx, types.NamespacedName{Namespace: ns, Name: job}, updated) if err != nil && !apierrors.IsNotFound(err) { return false, err } + ret = updated + if updated.Status.Succeeded > 0 { return true, nil } if updated.Status.Failed > 0 { - return true, fmt.Errorf("maintenance job %s/%s failed", job, job) + return true, nil } + return false, nil }) - return updated, err + return ret, err } -func GetMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) { +func getMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) { // Get the maintenance job related pod by label selector podList := &v1.PodList{} err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name})) @@ -137,7 +148,7 @@ func GetMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, e return terminated.Message, nil } -func GetLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) { +func getLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) { // Get the maintenance job list by label jobList := &batchv1.JobList{} err := cli.List(context.TODO(), jobList, &client.ListOptions{ @@ -162,7 +173,7 @@ func GetLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) return &jobList.Items[0], nil } -// GetMaintenanceJobConfig is called to get the Maintenance Job Config for the +// getMaintenanceJobConfig is called to get the Maintenance Job Config for the // BackupRepository specified by the repo parameter. // // Params: @@ -173,7 +184,7 @@ func GetLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) // veleroNamespace: the Velero-installed namespace. It's used to retrieve the BackupRepository. // repoMaintenanceJobConfig: the repository maintenance job ConfigMap name. // repo: the BackupRepository needs to run the maintenance Job. -func GetMaintenanceJobConfig( +func getMaintenanceJobConfig( ctx context.Context, client client.Client, logger logrus.FieldLogger, @@ -255,9 +266,27 @@ func GetMaintenanceJobConfig( return result, nil } -// WaitIncompleteMaintenance checks all the incomplete maintenance jobs of the specified repo and wait for them to complete, +func WaitMaintenanceJobComplete(cli client.Client, ctx context.Context, jobName, ns string, logger logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) { + log := logger.WithField("job name", jobName) + + maintenanceJob, err := waitForJobComplete(ctx, cli, ns, jobName) + if err != nil { + return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to wait for maintenance job complete") + } + + log.Info("Maintenance repo complete") + + result, err := getMaintenanceResultFromJob(cli, maintenanceJob) + if err != nil { + log.WithError(err).Warn("Failed to get maintenance job result") + } + + return composeMaintenanceStatusFromJob(maintenanceJob, result), nil +} + +// WaitAllMaintenanceJobComplete checks all the incomplete maintenance jobs of the specified repo and wait for them to complete, // and then return the maintenance jobs in the range of limit -func WaitIncompleteMaintenance(ctx context.Context, cli client.Client, repo *velerov1api.BackupRepository, limit int, log logrus.FieldLogger) ([]velerov1api.BackupRepositoryMaintenanceStatus, error) { +func WaitAllMaintenanceJobComplete(ctx context.Context, cli client.Client, repo *velerov1api.BackupRepository, limit int, log logrus.FieldLogger) ([]velerov1api.BackupRepositoryMaintenanceStatus, error) { jobList := &batchv1.JobList{} err := cli.List(context.TODO(), jobList, &client.ListOptions{ Namespace: repo.Namespace, @@ -290,7 +319,7 @@ func WaitIncompleteMaintenance(ctx context.Context, cli client.Client, repo *vel if job.Status.Succeeded == 0 && job.Status.Failed == 0 { log.Infof("Waiting for maintenance job %s to complete", job.Name) - updated, err := WaitForJobComplete(ctx, cli, job.Namespace, job.Name) + updated, err := waitForJobComplete(ctx, cli, job.Namespace, job.Name) if err != nil { return nil, errors.Wrapf(err, "error waiting maintenance job[%s] complete", job.Name) } @@ -298,18 +327,182 @@ func WaitIncompleteMaintenance(ctx context.Context, cli client.Client, repo *vel job = updated } - message, err := GetMaintenanceResultFromJob(cli, job) + message, err := getMaintenanceResultFromJob(cli, job) if err != nil { return nil, errors.Wrapf(err, "error getting maintenance job[%s] result", job.Name) } - history = append(history, ComposeMaintenanceStatusFromJob(job, message)) + history = append(history, composeMaintenanceStatusFromJob(job, message)) } return history, nil } -func ComposeMaintenanceStatusFromJob(job *batchv1.Job, message string) velerov1api.BackupRepositoryMaintenanceStatus { +func StartMaintenanceJob(cli client.Client, ctx context.Context, repo *velerov1api.BackupRepository, repoMaintenanceJobConfig string, + podResources kube.PodResources, logLevel logrus.Level, logFormat *logging.FormatFlag, logger logrus.FieldLogger) (string, error) { + bsl := &velerov1api.BackupStorageLocation{} + if err := cli.Get(ctx, client.ObjectKey{Namespace: repo.Namespace, Name: repo.Spec.BackupStorageLocation}, bsl); err != nil { + return "", errors.WithStack(err) + } + + log := logger.WithFields(logrus.Fields{ + "BSL name": bsl.Name, + "repo type": repo.Spec.RepositoryType, + "repo name": repo.Name, + "repo UID": repo.UID, + }) + + jobConfig, err := getMaintenanceJobConfig( + ctx, + cli, + log, + repo.Namespace, + repoMaintenanceJobConfig, + repo, + ) + if err != nil { + log.Warnf("Fail to find the ConfigMap %s to build maintenance job with error: %s. Use default value.", + repo.Namespace+"/"+repoMaintenanceJobConfig, + err.Error(), + ) + } + + log.Info("Starting maintenance repo") + + maintenanceJob, err := buildMaintenanceJob(cli, ctx, repo, bsl.Name, jobConfig, podResources, logLevel, logFormat) + if err != nil { + return "", errors.Wrap(err, "error to build maintenance job") + } + + log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name)) + + if err := cli.Create(context.TODO(), maintenanceJob); err != nil { + return "", errors.Wrap(err, "error to create maintenance job") + } + + log.Info("Repo maintenance job started") + + return maintenanceJob.Name, nil +} + +func buildMaintenanceJob(cli client.Client, ctx context.Context, repo *velerov1api.BackupRepository, bslName string, config *JobConfigs, + podResources kube.PodResources, logLevel logrus.Level, logFormat *logging.FormatFlag) (*batchv1.Job, error) { + // Get the Velero server deployment + deployment := &appsv1.Deployment{} + err := cli.Get(ctx, types.NamespacedName{Name: "velero", Namespace: repo.Namespace}, deployment) + if err != nil { + return nil, err + } + + // Get the environment variables from the Velero server deployment + envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment) + + // Get the referenced storage from the Velero server deployment + envFromSources := veleroutil.GetEnvFromSourcesFromVeleroServer(deployment) + + // Get the volume mounts from the Velero server deployment + volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment) + + // Get the volumes from the Velero server deployment + volumes := veleroutil.GetVolumesFromVeleroServer(deployment) + + // Get the service account from the Velero server deployment + serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment) + + // Get image + image := veleroutil.GetVeleroServerImage(deployment) + + // Set resource limits and requests + cpuRequest := podResources.CPURequest + memRequest := podResources.MemoryRequest + cpuLimit := podResources.CPULimit + memLimit := podResources.MemoryLimit + if config != nil && config.PodResources != nil { + cpuRequest = config.PodResources.CPURequest + memRequest = config.PodResources.MemoryRequest + cpuLimit = config.PodResources.CPULimit + memLimit = config.PodResources.MemoryLimit + } + resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit) + if err != nil { + return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job") + } + + // Set arguments + args := []string{"repo-maintenance"} + args = append(args, fmt.Sprintf("--repo-name=%s", repo.Spec.VolumeNamespace)) + args = append(args, fmt.Sprintf("--repo-type=%s", repo.Spec.RepositoryType)) + args = append(args, fmt.Sprintf("--backup-storage-location=%s", bslName)) + args = append(args, fmt.Sprintf("--log-level=%s", logLevel.String())) + args = append(args, fmt.Sprintf("--log-format=%s", logFormat.String())) + + // build the maintenance job + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: GenerateJobName(repo.Name), + Namespace: repo.Namespace, + Labels: map[string]string{ + RepositoryNameLabel: repo.Name, + }, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: new(int32), // Never retry + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "velero-repo-maintenance-pod", + Labels: map[string]string{ + RepositoryNameLabel: repo.Name, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "velero-repo-maintenance-container", + Image: image, + Command: []string{ + "/velero", + }, + Args: args, + ImagePullPolicy: v1.PullIfNotPresent, + Env: envVars, + EnvFrom: envFromSources, + VolumeMounts: volumeMounts, + Resources: resources, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + Volumes: volumes, + ServiceAccountName: serviceAccount, + }, + }, + }, + } + + if config != nil && len(config.LoadAffinities) > 0 { + affinity := kube.ToSystemAffinity(config.LoadAffinities) + job.Spec.Template.Spec.Affinity = affinity + } + + if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil { + job.Spec.Template.Spec.Tolerations = tolerations + } + + if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil { + job.Spec.Template.Spec.NodeSelector = nodeSelector + } + + if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 { + job.Spec.Template.Labels = labels + } + + if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 { + job.Spec.Template.Annotations = annotations + } + + return job, nil +} + +func composeMaintenanceStatusFromJob(job *batchv1.Job, message string) velerov1api.BackupRepositoryMaintenanceStatus { result := velerov1api.BackupRepositoryMaintenanceSucceeded if job.Status.Failed > 0 { result = velerov1api.BackupRepositoryMaintenanceFailed diff --git a/pkg/repository/maintenance_test.go b/pkg/repository/maintenance_test.go index 6cde95c956..42e2477d2c 100644 --- a/pkg/repository/maintenance_test.go +++ b/pkg/repository/maintenance_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -35,8 +36,12 @@ import ( velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/repository/provider" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/util/kube" + "github.com/vmware-tanzu/velero/pkg/util/logging" + + appsv1 "k8s.io/api/apps/v1" ) func TestGenerateJobName1(t *testing.T) { @@ -138,25 +143,45 @@ func TestWaitForJobComplete(t *testing.T) { Status: batchv1.JobStatus{}, } + schemeFail := runtime.NewScheme() + + scheme := runtime.NewScheme() + batchv1.AddToScheme(scheme) + // Define test cases tests := []struct { - description string // Test case description - jobStatus batchv1.JobStatus // Job status to set for the test - expectError bool // Whether an error is expected + description string // Test case description + kubeClientObj []runtime.Object + runtimeScheme *runtime.Scheme + jobStatus batchv1.JobStatus // Job status to set for the test + expectError bool // Whether an error is expected }{ { - description: "Job Succeeded", + description: "wait error", + runtimeScheme: schemeFail, + expectError: true, + }, + { + description: "Job Succeeded", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + job, + }, jobStatus: batchv1.JobStatus{ Succeeded: 1, }, expectError: false, }, { - description: "Job Failed", + description: "Job Failed", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + job, + }, jobStatus: batchv1.JobStatus{ Failed: 1, }, - expectError: true, + expectError: false, }, } @@ -166,9 +191,12 @@ func TestWaitForJobComplete(t *testing.T) { // Set the job status job.Status = tc.jobStatus // Create a fake Kubernetes client - cli := fake.NewClientBuilder().WithObjects(job).Build() + fakeClientBuilder := fake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(tc.runtimeScheme) + fakeClient := fakeClientBuilder.WithRuntimeObjects(tc.kubeClientObj...).Build() + // Call the function - _, err := WaitForJobComplete(context.Background(), cli, job.Namespace, job.Name) + _, err := waitForJobComplete(context.Background(), fakeClient, job.Namespace, job.Name) // Check if the error matches the expectation if tc.expectError { @@ -202,7 +230,7 @@ func TestGetMaintenanceResultFromJob(t *testing.T) { cli := fake.NewClientBuilder().WithObjects(job, pod).Build() // test an error should be returned - result, err := GetMaintenanceResultFromJob(cli, job) + result, err := getMaintenanceResultFromJob(cli, job) assert.Error(t, err) assert.Equal(t, "", result) @@ -217,7 +245,7 @@ func TestGetMaintenanceResultFromJob(t *testing.T) { // Test an error should be returned cli = fake.NewClientBuilder().WithObjects(job, pod).Build() - result, err = GetMaintenanceResultFromJob(cli, job) + result, err = getMaintenanceResultFromJob(cli, job) assert.Error(t, err) assert.Equal(t, "", result) @@ -236,7 +264,7 @@ func TestGetMaintenanceResultFromJob(t *testing.T) { // This call should return the termination message with no error cli = fake.NewClientBuilder().WithObjects(job, pod).Build() - result, err = GetMaintenanceResultFromJob(cli, job) + result, err = getMaintenanceResultFromJob(cli, job) assert.NoError(t, err) assert.Equal(t, "test message", result) } @@ -278,7 +306,7 @@ func TestGetLatestMaintenanceJob(t *testing.T) { cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() // Call the function - job, err := GetLatestMaintenanceJob(cli, "default") + job, err := getLatestMaintenanceJob(cli, "default") assert.NoError(t, err) // We expect the returned job to be the newer job @@ -441,7 +469,7 @@ func TestGetMaintenanceJobConfig(t *testing.T) { fakeClient = velerotest.NewFakeControllerRuntimeClient(t) } - jobConfig, err := GetMaintenanceJobConfig( + jobConfig, err := getMaintenanceJobConfig( ctx, fakeClient, logger, @@ -460,7 +488,7 @@ func TestGetMaintenanceJobConfig(t *testing.T) { } } -func TestWaitIncompleteMaintenance(t *testing.T) { +func TestWaitAllMaintenanceJobComplete(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) veleroNamespace := "velero" @@ -728,7 +756,7 @@ func TestWaitIncompleteMaintenance(t *testing.T) { fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() - history, err := WaitIncompleteMaintenance(test.ctx, fakeClient, repo, 3, velerotest.NewLogger()) + history, err := WaitAllMaintenanceJobComplete(test.ctx, fakeClient, repo, 3, velerotest.NewLogger()) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError) @@ -748,3 +776,205 @@ func TestWaitIncompleteMaintenance(t *testing.T) { cancel() } + +func TestBuildMaintenanceJob(t *testing.T) { + testCases := []struct { + name string + m *JobConfigs + deploy *appsv1.Deployment + logLevel logrus.Level + logFormat *logging.FormatFlag + expectedJobName string + expectedError bool + expectedEnv []v1.EnvVar + expectedEnvFrom []v1.EnvFromSource + }{ + { + name: "Valid maintenance job", + m: &JobConfigs{ + PodResources: &kube.PodResources{ + CPURequest: "100m", + MemoryRequest: "128Mi", + CPULimit: "200m", + MemoryLimit: "256Mi", + }, + }, + deploy: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "velero", + Namespace: "velero", + }, + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "velero-repo-maintenance-container", + Image: "velero-image", + Env: []v1.EnvVar{ + { + Name: "test-name", + Value: "test-value", + }, + }, + EnvFrom: []v1.EnvFromSource{ + { + ConfigMapRef: &v1.ConfigMapEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "test-configmap", + }, + }, + }, + { + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "test-secret", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + logLevel: logrus.InfoLevel, + logFormat: logging.NewFormatFlag(), + expectedJobName: "test-123-maintain-job", + expectedError: false, + expectedEnv: []v1.EnvVar{ + { + Name: "test-name", + Value: "test-value", + }, + }, + expectedEnvFrom: []v1.EnvFromSource{ + { + ConfigMapRef: &v1.ConfigMapEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "test-configmap", + }, + }, + }, + { + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "test-secret", + }, + }, + }, + }, + }, + { + name: "Error getting Velero server deployment", + m: &JobConfigs{ + PodResources: &kube.PodResources{ + CPURequest: "100m", + MemoryRequest: "128Mi", + CPULimit: "200m", + MemoryLimit: "256Mi", + }, + }, + logLevel: logrus.InfoLevel, + logFormat: logging.NewFormatFlag(), + expectedJobName: "", + expectedError: true, + }, + } + + param := provider.RepoParam{ + BackupRepo: &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test-123", + }, + Spec: velerov1api.BackupRepositorySpec{ + VolumeNamespace: "test-123", + RepositoryType: "kopia", + }, + }, + BackupLocation: &velerov1api.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test-location", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a fake clientset with resources + objs := []runtime.Object{param.BackupLocation, param.BackupRepo} + + if tc.deploy != nil { + objs = append(objs, tc.deploy) + } + scheme := runtime.NewScheme() + _ = appsv1.AddToScheme(scheme) + _ = velerov1api.AddToScheme(scheme) + cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build() + + // Call the function to test + job, err := buildMaintenanceJob(cli, context.TODO(), param.BackupRepo, param.BackupLocation.Name, tc.m, *tc.m.PodResources, tc.logLevel, tc.logFormat) + + // Check the error + if tc.expectedError { + assert.Error(t, err) + assert.Nil(t, job) + } else { + assert.NoError(t, err) + assert.NotNil(t, job) + assert.Contains(t, job.Name, tc.expectedJobName) + assert.Equal(t, param.BackupRepo.Namespace, job.Namespace) + assert.Equal(t, param.BackupRepo.Name, job.Labels[RepositoryNameLabel]) + + assert.Equal(t, param.BackupRepo.Name, job.Spec.Template.ObjectMeta.Labels[RepositoryNameLabel]) + + // Check container + assert.Len(t, job.Spec.Template.Spec.Containers, 1) + container := job.Spec.Template.Spec.Containers[0] + assert.Equal(t, "velero-repo-maintenance-container", container.Name) + assert.Equal(t, "velero-image", container.Image) + assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy) + + // Check container env + assert.Equal(t, tc.expectedEnv, container.Env) + assert.Equal(t, tc.expectedEnvFrom, container.EnvFrom) + + // Check resources + expectedResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPURequest), + v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryRequest), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPULimit), + v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryLimit), + }, + } + assert.Equal(t, expectedResources, container.Resources) + + // Check args + expectedArgs := []string{ + "repo-maintenance", + fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace), + fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType), + fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name), + fmt.Sprintf("--log-level=%s", tc.logLevel.String()), + fmt.Sprintf("--log-format=%s", tc.logFormat.String()), + } + assert.Equal(t, expectedArgs, container.Args) + + // Check affinity + assert.Nil(t, job.Spec.Template.Spec.Affinity) + + // Check tolerations + assert.Nil(t, job.Spec.Template.Spec.Tolerations) + + // Check node selector + assert.Nil(t, job.Spec.Template.Spec.NodeSelector) + } + }) + } +} diff --git a/pkg/repository/manager/manager.go b/pkg/repository/manager/manager.go index 3853f2d3c3..7027c96500 100644 --- a/pkg/repository/manager/manager.go +++ b/pkg/repository/manager/manager.go @@ -23,11 +23,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" @@ -35,9 +30,6 @@ import ( "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" - "github.com/vmware-tanzu/velero/pkg/util/kube" - "github.com/vmware-tanzu/velero/pkg/util/logging" - veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero" ) // Manager manages backup repositories. @@ -54,7 +46,7 @@ type Manager interface { PrepareRepo(repo *velerov1api.BackupRepository) error // PruneRepo deletes unused data from a repo. - PruneRepo(repo *velerov1api.BackupRepository) (velerov1api.BackupRepositoryMaintenanceStatus, error) + PruneRepo(repo *velerov1api.BackupRepository) error // UnlockRepo removes stale locks from a repo. UnlockRepo(repo *velerov1api.BackupRepository) error @@ -76,16 +68,10 @@ type manager struct { providers map[string]provider.Provider // client is the Velero controller manager's client. // It's limited to resources in the Velero namespace. - client client.Client - repoLocker *repository.RepoLocker - repoEnsurer *repository.Ensurer - fileSystem filesystem.Interface - repoMaintenanceJobConfig string - podResources kube.PodResources - keepLatestMaintenanceJobs int - log logrus.FieldLogger - logLevel logrus.Level - logFormat *logging.FormatFlag + client client.Client + repoLocker *repository.RepoLocker + fileSystem filesystem.Interface + log logrus.FieldLogger } // NewManager create a new repository manager. @@ -93,29 +79,17 @@ func NewManager( namespace string, client client.Client, repoLocker *repository.RepoLocker, - repoEnsurer *repository.Ensurer, credentialFileStore credentials.FileStore, credentialSecretStore credentials.SecretStore, - repoMaintenanceJobConfig string, - podResources kube.PodResources, - keepLatestMaintenanceJobs int, log logrus.FieldLogger, - logLevel logrus.Level, - logFormat *logging.FormatFlag, ) Manager { mgr := &manager{ - namespace: namespace, - client: client, - providers: map[string]provider.Provider{}, - repoLocker: repoLocker, - repoEnsurer: repoEnsurer, - fileSystem: filesystem.NewFileSystem(), - repoMaintenanceJobConfig: repoMaintenanceJobConfig, - podResources: podResources, - keepLatestMaintenanceJobs: keepLatestMaintenanceJobs, - log: log, - logLevel: logLevel, - logFormat: logFormat, + namespace: namespace, + client: client, + providers: map[string]provider.Provider{}, + repoLocker: repoLocker, + fileSystem: filesystem.NewFileSystem(), + log: log, } mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log) @@ -172,86 +146,24 @@ func (m *manager) PrepareRepo(repo *velerov1api.BackupRepository) error { return prd.PrepareRepo(context.Background(), param) } -func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) (velerov1api.BackupRepositoryMaintenanceStatus, error) { +func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error { m.repoLocker.LockExclusive(repo.Name) defer m.repoLocker.UnlockExclusive(repo.Name) - param, err := m.assembleRepoParam(repo) - if err != nil { - return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.WithStack(err) - } - - log := m.log.WithFields(logrus.Fields{ - "BSL name": param.BackupLocation.Name, - "repo type": param.BackupRepo.Spec.RepositoryType, - "repo name": param.BackupRepo.Name, - "repo UID": param.BackupRepo.UID, - }) - - job, err := repository.GetLatestMaintenanceJob(m.client, m.namespace) - if err != nil { - return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.WithStack(err) - } - - if job != nil && job.Status.Succeeded == 0 && job.Status.Failed == 0 { - log.Debugf("There already has a unfinished maintenance job %s/%s for repository %s, please wait for it to complete", job.Namespace, job.Name, param.BackupRepo.Name) - return velerov1api.BackupRepositoryMaintenanceStatus{}, nil - } - - jobConfig, err := repository.GetMaintenanceJobConfig( - context.Background(), - m.client, - m.log, - m.namespace, - m.repoMaintenanceJobConfig, - repo, - ) - if err != nil { - log.Infof("Fail to find the ConfigMap %s to build maintenance job with error: %s. Use default value.", - m.namespace+"/"+m.repoMaintenanceJobConfig, - err.Error(), - ) - } - - log.Info("Start to maintenance repo") - - maintenanceJob, err := m.buildMaintenanceJob( - jobConfig, - param, - ) + prd, err := m.getRepositoryProvider(repo) if err != nil { - return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to build maintenance job") - } - - log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name)) - - if err := m.client.Create(context.TODO(), maintenanceJob); err != nil { - return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to create maintenance job") + return errors.WithStack(err) } - log.Debug("Creating maintenance job") - - defer func() { - if err := repository.DeleteOldMaintenanceJobs( - m.client, - param.BackupRepo.Name, - m.keepLatestMaintenanceJobs, - ); err != nil { - log.WithError(err).Error("Failed to delete maintenance job") - } - }() - - maintenanceJob, err = repository.WaitForJobComplete(context.TODO(), m.client, maintenanceJob.Namespace, maintenanceJob.Name) + param, err := m.assembleRepoParam(repo) if err != nil { - return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to wait for maintenance job complete") + return errors.WithStack(err) } - result, err := repository.GetMaintenanceResultFromJob(m.client, maintenanceJob) - if err != nil { - return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to get maintenance job result") + if err := prd.BoostRepoConnect(context.Background(), param); err != nil { + return errors.WithStack(err) } - log.Info("Maintenance repo complete") - return repository.ComposeMaintenanceStatusFromJob(maintenanceJob, result), nil + return prd.PruneRepo(context.Background(), param) } func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error { @@ -344,122 +256,3 @@ func (m *manager) assembleRepoParam(repo *velerov1api.BackupRepository) (provide BackupRepo: repo, }, nil } - -func (m *manager) buildMaintenanceJob( - config *repository.JobConfigs, - param provider.RepoParam, -) (*batchv1.Job, error) { - // Get the Velero server deployment - deployment := &appsv1.Deployment{} - err := m.client.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: m.namespace}, deployment) - if err != nil { - return nil, err - } - - // Get the environment variables from the Velero server deployment - envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment) - - // Get the referenced storage from the Velero server deployment - envFromSources := veleroutil.GetEnvFromSourcesFromVeleroServer(deployment) - - // Get the volume mounts from the Velero server deployment - volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment) - - // Get the volumes from the Velero server deployment - volumes := veleroutil.GetVolumesFromVeleroServer(deployment) - - // Get the service account from the Velero server deployment - serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment) - - // Get image - image := veleroutil.GetVeleroServerImage(deployment) - - // Set resource limits and requests - cpuRequest := m.podResources.CPURequest - memRequest := m.podResources.MemoryRequest - cpuLimit := m.podResources.CPULimit - memLimit := m.podResources.MemoryLimit - if config != nil && config.PodResources != nil { - cpuRequest = config.PodResources.CPURequest - memRequest = config.PodResources.MemoryRequest - cpuLimit = config.PodResources.CPULimit - memLimit = config.PodResources.MemoryLimit - } - resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit) - if err != nil { - return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job") - } - - // Set arguments - args := []string{"repo-maintenance"} - args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace)) - args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType)) - args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name)) - args = append(args, fmt.Sprintf("--log-level=%s", m.logLevel.String())) - args = append(args, fmt.Sprintf("--log-format=%s", m.logFormat.String())) - - // build the maintenance job - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: repository.GenerateJobName(param.BackupRepo.Name), - Namespace: param.BackupRepo.Namespace, - Labels: map[string]string{ - repository.RepositoryNameLabel: param.BackupRepo.Name, - }, - }, - Spec: batchv1.JobSpec{ - BackoffLimit: new(int32), // Never retry - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Name: "velero-repo-maintenance-pod", - Labels: map[string]string{ - repository.RepositoryNameLabel: param.BackupRepo.Name, - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "velero-repo-maintenance-container", - Image: image, - Command: []string{ - "/velero", - }, - Args: args, - ImagePullPolicy: v1.PullIfNotPresent, - Env: envVars, - EnvFrom: envFromSources, - VolumeMounts: volumeMounts, - Resources: resources, - }, - }, - RestartPolicy: v1.RestartPolicyNever, - Volumes: volumes, - ServiceAccountName: serviceAccount, - }, - }, - }, - } - - if config != nil && len(config.LoadAffinities) > 0 { - affinity := kube.ToSystemAffinity(config.LoadAffinities) - job.Spec.Template.Spec.Affinity = affinity - } - - if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil { - job.Spec.Template.Spec.Tolerations = tolerations - } - - if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil { - job.Spec.Template.Spec.NodeSelector = nodeSelector - } - - if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 { - job.Spec.Template.Labels = labels - } - - if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 { - job.Spec.Template.Annotations = annotations - } - - return job, nil -} diff --git a/pkg/repository/manager/manager_test.go b/pkg/repository/manager/manager_test.go index e83f5f5276..d743e267ae 100644 --- a/pkg/repository/manager/manager_test.go +++ b/pkg/repository/manager/manager_test.go @@ -17,31 +17,18 @@ limitations under the License. package repository import ( - "fmt" "testing" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/repository" - "github.com/vmware-tanzu/velero/pkg/repository/provider" - "github.com/vmware-tanzu/velero/pkg/util/kube" - "github.com/vmware-tanzu/velero/pkg/util/logging" ) func TestGetRepositoryProvider(t *testing.T) { var fakeClient kbclient.Client - mgr := NewManager("", fakeClient, nil, nil, nil, nil, "", kube.PodResources{}, 3, nil, logrus.InfoLevel, nil).(*manager) + mgr := NewManager("", fakeClient, nil, nil, nil, nil).(*manager) repo := &velerov1.BackupRepository{} // empty repository type @@ -60,220 +47,3 @@ func TestGetRepositoryProvider(t *testing.T) { _, err = mgr.getRepositoryProvider(repo) require.Error(t, err) } - -func TestBuildMaintenanceJob(t *testing.T) { - testCases := []struct { - name string - m *repository.JobConfigs - deploy *appsv1.Deployment - logLevel logrus.Level - logFormat *logging.FormatFlag - expectedJobName string - expectedError bool - expectedEnv []v1.EnvVar - expectedEnvFrom []v1.EnvFromSource - }{ - { - name: "Valid maintenance job", - m: &repository.JobConfigs{ - PodResources: &kube.PodResources{ - CPURequest: "100m", - MemoryRequest: "128Mi", - CPULimit: "200m", - MemoryLimit: "256Mi", - }, - }, - deploy: &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "velero", - Namespace: "velero", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "velero-repo-maintenance-container", - Image: "velero-image", - Env: []v1.EnvVar{ - { - Name: "test-name", - Value: "test-value", - }, - }, - EnvFrom: []v1.EnvFromSource{ - { - ConfigMapRef: &v1.ConfigMapEnvSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "test-configmap", - }, - }, - }, - { - SecretRef: &v1.SecretEnvSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "test-secret", - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - logLevel: logrus.InfoLevel, - logFormat: logging.NewFormatFlag(), - expectedJobName: "test-123-maintain-job", - expectedError: false, - expectedEnv: []v1.EnvVar{ - { - Name: "test-name", - Value: "test-value", - }, - }, - expectedEnvFrom: []v1.EnvFromSource{ - { - ConfigMapRef: &v1.ConfigMapEnvSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "test-configmap", - }, - }, - }, - { - SecretRef: &v1.SecretEnvSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "test-secret", - }, - }, - }, - }, - }, - { - name: "Error getting Velero server deployment", - m: &repository.JobConfigs{ - PodResources: &kube.PodResources{ - CPURequest: "100m", - MemoryRequest: "128Mi", - CPULimit: "200m", - MemoryLimit: "256Mi", - }, - }, - logLevel: logrus.InfoLevel, - logFormat: logging.NewFormatFlag(), - expectedJobName: "", - expectedError: true, - }, - } - - param := provider.RepoParam{ - BackupRepo: &velerov1api.BackupRepository{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "test-123", - }, - Spec: velerov1api.BackupRepositorySpec{ - VolumeNamespace: "test-123", - RepositoryType: "kopia", - }, - }, - BackupLocation: &velerov1api.BackupStorageLocation{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "test-location", - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Create a fake clientset with resources - objs := []runtime.Object{param.BackupLocation, param.BackupRepo} - - if tc.deploy != nil { - objs = append(objs, tc.deploy) - } - scheme := runtime.NewScheme() - _ = appsv1.AddToScheme(scheme) - _ = velerov1api.AddToScheme(scheme) - cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build() - - mgr := NewManager( - "velero", - cli, - nil, - nil, - nil, - nil, - "", - kube.PodResources{}, - 3, - nil, - logrus.InfoLevel, - logging.NewFormatFlag(), - ).(*manager) - - // Call the function to test - job, err := mgr.buildMaintenanceJob(tc.m, param) - - // Check the error - if tc.expectedError { - assert.Error(t, err) - assert.Nil(t, job) - } else { - assert.NoError(t, err) - assert.NotNil(t, job) - assert.Contains(t, job.Name, tc.expectedJobName) - assert.Equal(t, param.BackupRepo.Namespace, job.Namespace) - assert.Equal(t, param.BackupRepo.Name, job.Labels[repository.RepositoryNameLabel]) - - assert.Equal(t, param.BackupRepo.Name, job.Spec.Template.ObjectMeta.Labels[repository.RepositoryNameLabel]) - - // Check container - assert.Len(t, job.Spec.Template.Spec.Containers, 1) - container := job.Spec.Template.Spec.Containers[0] - assert.Equal(t, "velero-repo-maintenance-container", container.Name) - assert.Equal(t, "velero-image", container.Image) - assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy) - - // Check container env - assert.Equal(t, tc.expectedEnv, container.Env) - assert.Equal(t, tc.expectedEnvFrom, container.EnvFrom) - - // Check resources - expectedResources := v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPURequest), - v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryRequest), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPULimit), - v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryLimit), - }, - } - assert.Equal(t, expectedResources, container.Resources) - - // Check args - expectedArgs := []string{ - "repo-maintenance", - fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace), - fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType), - fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name), - fmt.Sprintf("--log-level=%s", tc.logLevel.String()), - fmt.Sprintf("--log-format=%s", tc.logFormat.String()), - } - assert.Equal(t, expectedArgs, container.Args) - - // Check affinity - assert.Nil(t, job.Spec.Template.Spec.Affinity) - - // Check tolerations - assert.Nil(t, job.Spec.Template.Spec.Tolerations) - - // Check node selector - assert.Nil(t, job.Spec.Template.Spec.NodeSelector) - } - }) - } -} diff --git a/pkg/repository/mocks/Manager.go b/pkg/repository/mocks/Manager.go index c987693b4b..2264117752 100644 --- a/pkg/repository/mocks/Manager.go +++ b/pkg/repository/mocks/Manager.go @@ -138,31 +138,21 @@ func (_m *Manager) PrepareRepo(repo *v1.BackupRepository) error { } // PruneRepo provides a mock function with given fields: repo -func (_m *Manager) PruneRepo(repo *v1.BackupRepository) (v1.BackupRepositoryMaintenanceStatus, error) { +func (_m *Manager) PruneRepo(repo *v1.BackupRepository) error { ret := _m.Called(repo) if len(ret) == 0 { panic("no return value specified for PruneRepo") } - var r0 v1.BackupRepositoryMaintenanceStatus - var r1 error - if rf, ok := ret.Get(0).(func(*v1.BackupRepository) (v1.BackupRepositoryMaintenanceStatus, error)); ok { - return rf(repo) - } - if rf, ok := ret.Get(0).(func(*v1.BackupRepository) v1.BackupRepositoryMaintenanceStatus); ok { + var r0 error + if rf, ok := ret.Get(0).(func(*v1.BackupRepository) error); ok { r0 = rf(repo) } else { - r0 = ret.Get(0).(v1.BackupRepositoryMaintenanceStatus) - } - - if rf, ok := ret.Get(1).(func(*v1.BackupRepository) error); ok { - r1 = rf(repo) - } else { - r1 = ret.Error(1) + r0 = ret.Error(0) } - return r0, r1 + return r0 } // UnlockRepo provides a mock function with given fields: repo