Skip to content

Commit

Permalink
fix: out-of-order status updates (#1494)
Browse files Browse the repository at this point in the history
- Wait for the async status updates to complete before continuing
  after updating.
  • Loading branch information
karlkfi authored Nov 15, 2024
1 parent 3ff58e7 commit d6bacb8
Showing 1 changed file with 43 additions and 38 deletions.
81 changes: 43 additions & 38 deletions pkg/parse/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,20 +478,19 @@ func (r *reconciler) parseAndUpdate(ctx context.Context, trigger string) status.
return sourceErrs
}

// Create a new context with its cancellation function.
ctxForUpdateSyncStatus, cancel := context.WithCancel(context.Background())

go r.updateSyncStatusPeriodically(ctxForUpdateSyncStatus)
asyncCtx, asyncCancel := context.WithCancel(ctx)
asyncDoneCh := r.startAsyncStatusUpdates(asyncCtx)

klog.V(3).Info("Updater starting...")
start := opts.Clock.Now()
updateErrs := opts.Update(ctx, &state.cache)
metrics.RecordParserDuration(ctx, trigger, "update", metrics.StatusTagKey(updateErrs), start)
klog.V(3).Info("Updater stopped")

// This is to terminate `updateSyncStatusPeriodically`.
cancel()
// TODO: Wait for periodic updates to stop
// Stop periodic updates
asyncCancel()
// Wait for periodic updates to stop
<-asyncDoneCh

// SyncErrors include errors from both the Updater and Remediator
klog.V(3).Info("Updating sync status (after sync)")
Expand Down Expand Up @@ -533,39 +532,45 @@ func (r *reconciler) setSyncStatus(ctx context.Context, newSyncStatus *SyncStatu
return nil
}

// updateSyncStatusPeriodically update the sync status periodically until the
// cancellation function of the context is called.
func (r *reconciler) updateSyncStatusPeriodically(ctx context.Context) {
opts := r.Options()
state := r.ReconcilerState()
klog.V(3).Info("Periodic sync status updates starting...")
updatePeriod := opts.StatusUpdatePeriod
updateTimer := opts.Clock.NewTimer(updatePeriod)
defer updateTimer.Stop()
for {
select {
case <-ctx.Done():
// ctx.Done() is closed when the cancellation function of the context is called.
klog.V(3).Info("Periodic sync status updates stopped")
return

case <-updateTimer.C():
klog.V(3).Info("Updating sync status (periodic while syncing)")
// Copy the spec and commit from the source status
syncStatus := &SyncStatus{
Spec: state.status.SourceStatus.Spec,
Syncing: true,
Commit: state.cache.source.commit,
Errs: r.ReconcilerState().SyncErrors(),
LastUpdate: nowMeta(opts.Clock),
// startAsyncStatusUpdates starts a goroutine that updates the sync status
// periodically until the context is cancelled. The caller should wait until the
// done channel is closed to confirm the goroutine has exited.
func (r *reconciler) startAsyncStatusUpdates(ctx context.Context) <-chan struct{} {
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
opts := r.Options()
state := r.ReconcilerState()
klog.V(3).Info("Periodic sync status updates starting...")
updatePeriod := opts.StatusUpdatePeriod
updateTimer := opts.Clock.NewTimer(updatePeriod)
defer updateTimer.Stop()
for {
select {
case <-ctx.Done():
// ctx.Done() is closed when the cancellation function of the context is called.
klog.V(3).Info("Periodic sync status updates stopped")
return

case <-updateTimer.C():
klog.V(3).Info("Updating sync status (periodic while syncing)")
// Copy the spec and commit from the source status
syncStatus := &SyncStatus{
Spec: state.status.SourceStatus.Spec,
Syncing: true,
Commit: state.cache.source.commit,
Errs: state.SyncErrors(),
LastUpdate: nowMeta(opts.Clock),
}
if err := r.setSyncStatus(ctx, syncStatus); err != nil {
klog.Warningf("failed to update sync status: %v", err)
}

updateTimer.Reset(updatePeriod) // Schedule status update attempt
}
if err := r.setSyncStatus(ctx, syncStatus); err != nil {
klog.Warningf("failed to update sync status: %v", err)
}

updateTimer.Reset(updatePeriod) // Schedule status update attempt
}
}
}()
return doneCh
}

// reportRootSyncConflicts reports conflicts to the RootSync that manages the
Expand Down

0 comments on commit d6bacb8

Please sign in to comment.