Skip to content

Commit

Permalink
fix: cleanup routine when semaphore lease is lost (#42534)
Browse files Browse the repository at this point in the history
This PR fixes an edge case where the semaphore lock can be lost due to a
backend error and the routine can continue running.

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
tigrato authored Jun 6, 2024
1 parent 9288a76 commit f63aa82
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
26 changes: 20 additions & 6 deletions lib/srv/discovery/access_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"sync"
"time"

"github.com/gravitational/trace"
Expand All @@ -45,10 +46,8 @@ const (
batchSize = 500
)

var (
// errNoAccessGraphFetchers is returned when there are no TAG fetchers.
errNoAccessGraphFetchers = errors.New("no Access Graph fetchers")
)
// errNoAccessGraphFetchers is returned when there are no TAG fetchers.
var errNoAccessGraphFetchers = errors.New("no Access Graph fetchers")

func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *aws_sync.Resources, stream accessgraphv1alpha.AccessGraphService_AWSEventsStreamClient, features aws_sync.Features) error {
type fetcherResult struct {
Expand Down Expand Up @@ -269,6 +268,21 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c
if err != nil {
return trace.Wrap(err)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
return
case <-lease.Done():
cancel()
}
}()
defer func() {
lease.Stop()
if err := lease.Wait(); err != nil {
Expand Down Expand Up @@ -310,12 +324,12 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c
}
features := aws_sync.BuildFeatures(supportedKinds...)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Start a goroutine to watch the access graph service connection state.
// If the connection is closed, cancel the context to stop the event watcher
// before it tries to send any events to the access graph service.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
if !accessGraphConn.WaitForStateChange(ctx, connectivity.Ready) {
s.Log.Info("access graph service connection was closed")
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/discovery/access_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
var testErr = "test error"
testErr := "test error"
clock := clockwork.NewFakeClock()
type args struct {
fetchers []aws_sync.AWSSync
Expand Down

0 comments on commit f63aa82

Please sign in to comment.