Skip to content

Commit

Permalink
fix: improve semaphore concurrency performance
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Suen <jesse@akuity.io>
  • Loading branch information
jessesuen authored and terrytangyuan committed Sep 23, 2022
1 parent e6eb02f commit 18f0591
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
6 changes: 5 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,11 @@ func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context)
}

nextWorkflow := func(key string) {
wfc.wfQueue.AddRateLimited(key)
// We now use AddAfter(1s) instead of AddRateLimited() because if we add immediately,
// the enqueued workflow will likely be reconciled before we have finished reconciling
// *this* workflow. If we reconcile the next workflow too soon (before we have finished
// incrementing the semaphore counters), the next workflow will believe it cannot run.
wfc.wfQueue.AddAfter(key, 1*time.Second)
}

isWFDeleted := func(key string) bool {
Expand Down
5 changes: 5 additions & 0 deletions workflow/sync/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) {
if s.acquire(holderKey) {
s.pending.pop()
s.log.Infof("%s acquired by %s ", s.name, nextKey)
if s.pending.Len() > 0 && len(s.lockHolder) < s.limit {
// We just acquired the semaphore but others are waiting and there is room for the
// next one. Enqueue the next workflow in line.
s.nextWorkflow(s.pending.peek().key)
}
return true, ""
}
s.log.Debugf("Current semaphore Holders. %v", s.lockHolder)
Expand Down

0 comments on commit 18f0591

Please sign in to comment.