From d6bacb82bfdcb8a89741864d8d7e77a3af09d734 Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Fri, 15 Nov 2024 12:52:20 -0800 Subject: [PATCH] fix: out-of-order status updates (#1494) - Wait for the async status updates to complete before continuing after updating. --- pkg/parse/run.go | 81 +++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 7e867520a..404b3b603 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -478,10 +478,8 @@ func (r *reconciler) parseAndUpdate(ctx context.Context, trigger string) status. return sourceErrs } - // Create a new context with its cancellation function. - ctxForUpdateSyncStatus, cancel := context.WithCancel(context.Background()) - - go r.updateSyncStatusPeriodically(ctxForUpdateSyncStatus) + asyncCtx, asyncCancel := context.WithCancel(ctx) + asyncDoneCh := r.startAsyncStatusUpdates(asyncCtx) klog.V(3).Info("Updater starting...") start := opts.Clock.Now() @@ -489,9 +487,10 @@ func (r *reconciler) parseAndUpdate(ctx context.Context, trigger string) status. metrics.RecordParserDuration(ctx, trigger, "update", metrics.StatusTagKey(updateErrs), start) klog.V(3).Info("Updater stopped") - // This is to terminate `updateSyncStatusPeriodically`. - cancel() - // TODO: Wait for periodic updates to stop + // Stop periodic updates + asyncCancel() + // Wait for periodic updates to stop + <-asyncDoneCh // SyncErrors include errors from both the Updater and Remediator klog.V(3).Info("Updating sync status (after sync)") @@ -533,39 +532,45 @@ func (r *reconciler) setSyncStatus(ctx context.Context, newSyncStatus *SyncStatu return nil } -// updateSyncStatusPeriodically update the sync status periodically until the -// cancellation function of the context is called. -func (r *reconciler) updateSyncStatusPeriodically(ctx context.Context) { - opts := r.Options() - state := r.ReconcilerState() - klog.V(3).Info("Periodic sync status updates starting...") - updatePeriod := opts.StatusUpdatePeriod - updateTimer := opts.Clock.NewTimer(updatePeriod) - defer updateTimer.Stop() - for { - select { - case <-ctx.Done(): - // ctx.Done() is closed when the cancellation function of the context is called. - klog.V(3).Info("Periodic sync status updates stopped") - return - - case <-updateTimer.C(): - klog.V(3).Info("Updating sync status (periodic while syncing)") - // Copy the spec and commit from the source status - syncStatus := &SyncStatus{ - Spec: state.status.SourceStatus.Spec, - Syncing: true, - Commit: state.cache.source.commit, - Errs: r.ReconcilerState().SyncErrors(), - LastUpdate: nowMeta(opts.Clock), +// startAsyncStatusUpdates starts a goroutine that updates the sync status +// periodically until the context is cancelled. The caller should wait until the +// done channel is closed to confirm the goroutine has exited. +func (r *reconciler) startAsyncStatusUpdates(ctx context.Context) <-chan struct{} { + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + opts := r.Options() + state := r.ReconcilerState() + klog.V(3).Info("Periodic sync status updates starting...") + updatePeriod := opts.StatusUpdatePeriod + updateTimer := opts.Clock.NewTimer(updatePeriod) + defer updateTimer.Stop() + for { + select { + case <-ctx.Done(): + // ctx.Done() is closed when the cancellation function of the context is called. + klog.V(3).Info("Periodic sync status updates stopped") + return + + case <-updateTimer.C(): + klog.V(3).Info("Updating sync status (periodic while syncing)") + // Copy the spec and commit from the source status + syncStatus := &SyncStatus{ + Spec: state.status.SourceStatus.Spec, + Syncing: true, + Commit: state.cache.source.commit, + Errs: state.SyncErrors(), + LastUpdate: nowMeta(opts.Clock), + } + if err := r.setSyncStatus(ctx, syncStatus); err != nil { + klog.Warningf("failed to update sync status: %v", err) + } + + updateTimer.Reset(updatePeriod) // Schedule status update attempt } - if err := r.setSyncStatus(ctx, syncStatus); err != nil { - klog.Warningf("failed to update sync status: %v", err) - } - - updateTimer.Reset(updatePeriod) // Schedule status update attempt } - } + }() + return doneCh } // reportRootSyncConflicts reports conflicts to the RootSync that manages the