Skip to content

Commit

Permalink
always use job's time
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
  • Loading branch information
Lyndon-Li committed Jan 3, 2025
1 parent 6ff0aa3 commit 912b116
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 285 deletions.
204 changes: 115 additions & 89 deletions pkg/controller/backup_repository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
fallthrough
case velerov1api.BackupRepositoryPhaseReady:
if err := r.processUnrecordedMaintenance(ctx, backupRepo, log); err != nil {
if err := r.recallMaintenance(ctx, backupRepo, log); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error handling incomplete repo maintenance jobs")
}

Expand All @@ -218,85 +218,6 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

func (r *BackupRepoReconciler) processUnrecordedMaintenance(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
history, err := repository.WaitIncompleteMaintenance(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log)
if err != nil {
return errors.Wrapf(err, "error waiting incomplete repo maintenance job for repo %s", req.Name)
}

consolidated := consolidateHistory(history, req.Status.RecentMaintenance)
if consolidated == nil {
return nil
}

log.Warn("Updating backup repository because of unrecorded histories")

return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
rr.Status.RecentMaintenance = consolidated
})
}

func consolidateHistory(coming, cur []velerov1api.BackupRepositoryMaintenanceStatus) []velerov1api.BackupRepositoryMaintenanceStatus {
if len(coming) == 0 {
return nil
}

if isIdenticalHistories(coming, cur) {
return nil
}

truncated := []velerov1api.BackupRepositoryMaintenanceStatus{}
i := len(cur) - 1
j := len(coming) - 1
for i >= 0 || j >= 0 {
if len(truncated) == defaultMaintenanceStatusQueueLength {
break
}

if i >= 0 && j >= 0 {
if isEarlierHistory(cur[i], coming[j]) {
truncated = append(truncated, coming[j])
j--
} else {
truncated = append(truncated, cur[i])
i--
}
} else if i >= 0 {
truncated = append(truncated, cur[i])
i--
} else {
truncated = append(truncated, coming[j])
j--
}
}

slices.Reverse(truncated)

if isIdenticalHistories(truncated, cur) {
return nil
}

return truncated
}

func isIdenticalHistories(a, b []velerov1api.BackupRepositoryMaintenanceStatus) bool {
if len(a) != len(b) {
return false
}

for i := 0; i < len(a); i++ {
if !a[i].CompleteTimestamp.Equal(b[i].CompleteTimestamp) {
return false
}
}

return true
}

func isEarlierHistory(a, b velerov1api.BackupRepositoryMaintenanceStatus) bool {
return a.CompleteTimestamp.Before(b.CompleteTimestamp)
}

func (r *BackupRepoReconciler) getIdentiferByBSL(ctx context.Context, req *velerov1api.BackupRepository) (string, error) {
loc := &velerov1api.BackupStorageLocation{}

Expand Down Expand Up @@ -384,10 +305,109 @@ func ensureRepo(repo *velerov1api.BackupRepository, repoManager repomanager.Mana
return repoManager.PrepareRepo(repo)
}

func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
startTime := r.clock.Now()
func (r *BackupRepoReconciler) recallMaintenance(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
history, err := repository.WaitIncompleteMaintenance(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log)
if err != nil {
return errors.Wrapf(err, "error waiting incomplete repo maintenance job for repo %s", req.Name)
}

if !dueForMaintenance(req, startTime) {
consolidated := consolidateHistory(history, req.Status.RecentMaintenance)
if consolidated == nil {
return nil
}

lastMaintenanceTime := getLastMaintenanceTimeFromHistory(consolidated)

log.Warn("Updating backup repository because of unrecorded histories")

if lastMaintenanceTime.After(req.Status.LastMaintenanceTime.Time) {
log.Warnf("Updating backup repository last maintenance time (%v) from history (%v)", req.Status.LastMaintenanceTime.Time, lastMaintenanceTime.Time)
}

return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
if lastMaintenanceTime.After(rr.Status.LastMaintenanceTime.Time) {
rr.Status.LastMaintenanceTime = lastMaintenanceTime
}

rr.Status.RecentMaintenance = consolidated
})
}

func consolidateHistory(coming, cur []velerov1api.BackupRepositoryMaintenanceStatus) []velerov1api.BackupRepositoryMaintenanceStatus {
if len(coming) == 0 {
return nil
}

if isIdenticalHistory(coming, cur) {
return nil
}

truncated := []velerov1api.BackupRepositoryMaintenanceStatus{}
i := len(cur) - 1
j := len(coming) - 1
for i >= 0 || j >= 0 {
if len(truncated) == defaultMaintenanceStatusQueueLength {
break
}

if i >= 0 && j >= 0 {
if isEarlierMaintenanceStatus(cur[i], coming[j]) {
truncated = append(truncated, coming[j])
j--
} else {
truncated = append(truncated, cur[i])
i--
}
} else if i >= 0 {
truncated = append(truncated, cur[i])
i--
} else {
truncated = append(truncated, coming[j])
j--
}
}

slices.Reverse(truncated)

if isIdenticalHistory(truncated, cur) {
return nil
}

return truncated
}

func getLastMaintenanceTimeFromHistory(history []velerov1api.BackupRepositoryMaintenanceStatus) *metav1.Time {
time := history[0].CompleteTimestamp

for i := range history {
if time.Before(history[i].CompleteTimestamp) {
time = history[i].CompleteTimestamp
}
}

return time
}

func isIdenticalHistory(a, b []velerov1api.BackupRepositoryMaintenanceStatus) bool {
if len(a) != len(b) {
return false
}

for i := 0; i < len(a); i++ {
if !a[i].StartTimestamp.Equal(b[i].StartTimestamp) {
return false
}
}

return true
}

func isEarlierMaintenanceStatus(a, b velerov1api.BackupRepositoryMaintenanceStatus) bool {
return a.StartTimestamp.Before(b.StartTimestamp)
}

func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
if !dueForMaintenance(req, r.clock.Now()) {
log.Debug("not due for maintenance")
return nil
}
Expand All @@ -398,17 +418,23 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel
// should not cause the repo to move to `NotReady`.
log.Debug("Pruning repo")

if err := r.repositoryManager.PruneRepo(req); err != nil {
log.WithError(err).Warn("error pruning repository")
// when PruneRepo fails, the maintenance result will be left temporarily
// If the maintenenance still completes later, recallMaintenance recalls the left onces and update LastMaintenanceTime and history
status, err := r.repositoryManager.PruneRepo(req)
if err != nil {
return errors.Wrapf(err, "error pruning repository")
}

if status.Result == velerov1api.BackupRepositoryMaintenanceFailed {
log.WithError(err).Warn("Pruning repository failed")
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, startTime, r.clock.Now(), err.Error())
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, status.StartTimestamp.Time, status.CompleteTimestamp.Time, status.Message)
})
}

return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
completionTime := r.clock.Now()
rr.Status.LastMaintenanceTime = &metav1.Time{Time: completionTime}
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, startTime, completionTime, "")
rr.Status.LastMaintenanceTime = &metav1.Time{Time: status.CompleteTimestamp.Time}
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, status.StartTimestamp.Time, status.CompleteTimestamp.Time, status.Message)
})
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/backup_repository_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ import (

const testMaintenanceFrequency = 10 * time.Minute

func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret interface{}) *BackupRepoReconciler {
func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret ...interface{}) *BackupRepoReconciler {
t.Helper()
mgr := &repomokes.Manager{}
if mockOn != "" {
mgr.On(mockOn, arg).Return(ret)
mgr.On(mockOn, arg).Return(ret...)
}
return NewBackupRepoReconciler(
velerov1api.DefaultNamespace,
Expand Down Expand Up @@ -106,7 +106,10 @@ func TestCheckNotReadyRepo(t *testing.T) {

func TestRunMaintenanceIfDue(t *testing.T) {
rr := mockBackupRepositoryCR()
reconciler := mockBackupRepoReconciler(t, "PruneRepo", rr, nil)
reconciler := mockBackupRepoReconciler(t, "PruneRepo", rr, velerov1api.BackupRepositoryMaintenanceStatus{
StartTimestamp: &metav1.Time{},
CompleteTimestamp: &metav1.Time{},
}, nil)
err := reconciler.Client.Create(context.TODO(), rr)
assert.NoError(t, err)
lastTm := rr.Status.LastMaintenanceTime
Expand Down
65 changes: 39 additions & 26 deletions pkg/repository/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,26 @@ func DeleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error {
return nil
}

func WaitForJobComplete(ctx context.Context, client client.Client, job *batchv1.Job) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
err := client.Get(ctx, types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, job)
// WaitForJobComplete wait for completion of the specified job and update the latest job object
func WaitForJobComplete(ctx context.Context, client client.Client, ns string, job string) (*batchv1.Job, error) {
updated := &batchv1.Job{}
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
err := client.Get(ctx, types.NamespacedName{Namespace: ns, Name: job}, updated)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}

if job.Status.Succeeded > 0 {
if updated.Status.Succeeded > 0 {
return true, nil
}

if job.Status.Failed > 0 {
return true, fmt.Errorf("maintenance job %s/%s failed", job.Namespace, job.Name)
if updated.Status.Failed > 0 {
return true, fmt.Errorf("maintenance job %s/%s failed", job, job)
}
return false, nil
})

return updated, err
}

func GetMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) {
Expand Down Expand Up @@ -269,43 +273,52 @@ func WaitIncompleteMaintenance(ctx context.Context, cli client.Client, repo *vel
return nil, nil
}

sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Time.Before(jobList.Items[j].CreationTimestamp.Time)
})

history := []velerov1api.BackupRepositoryMaintenanceStatus{}

for _, job := range jobList.Items {
startPos := len(history) - limit
if startPos < 0 {
startPos = 0
}

for i := startPos; i < len(jobList.Items); i++ {
job := &jobList.Items[i]

if job.Status.Succeeded == 0 && job.Status.Failed == 0 {
log.Infof("Waiting for maintenance job %s to complete", job.Name)

if err := WaitForJobComplete(ctx, cli, &job); err != nil {
updated, err := WaitForJobComplete(ctx, cli, job.Namespace, job.Name)
if err != nil {
return nil, errors.Wrapf(err, "error waiting maintenance job[%s] complete", job.Name)
}
}

result := velerov1api.BackupRepositoryMaintenanceSucceeded
if job.Status.Failed > 0 {
result = velerov1api.BackupRepositoryMaintenanceFailed
job = updated
}

message, err := GetMaintenanceResultFromJob(cli, &job)
message, err := GetMaintenanceResultFromJob(cli, job)
if err != nil {
return nil, errors.Wrapf(err, "error getting maintenance job[%s] result", job.Name)
}

history = append(history, velerov1api.BackupRepositoryMaintenanceStatus{
Result: result,
StartTimestamp: &metav1.Time{Time: job.Status.StartTime.Time},
CompleteTimestamp: &metav1.Time{Time: job.Status.CompletionTime.Time},
Message: message,
})
history = append(history, ComposeMaintenanceStatusFromJob(job, message))
}

sort.Slice(history, func(i, j int) bool {
return history[i].CompleteTimestamp.Time.After(history[j].CompleteTimestamp.Time)
})
return history, nil
}

startPos := len(history) - limit
if startPos < 0 {
startPos = 0
func ComposeMaintenanceStatusFromJob(job *batchv1.Job, message string) velerov1api.BackupRepositoryMaintenanceStatus {
result := velerov1api.BackupRepositoryMaintenanceSucceeded
if job.Status.Failed > 0 {
result = velerov1api.BackupRepositoryMaintenanceFailed
}

return history[startPos:], nil
return velerov1api.BackupRepositoryMaintenanceStatus{
Result: result,
StartTimestamp: &metav1.Time{Time: job.CreationTimestamp.Time},
CompleteTimestamp: &metav1.Time{Time: job.Status.CompletionTime.Time},
Message: message,
}
}
Loading

0 comments on commit 912b116

Please sign in to comment.