From f63aa826b5c36c77f56c8aa258b585b2a4aae687 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Thu, 6 Jun 2024 11:39:31 +0100 Subject: [PATCH] fix: cleanup routine when semaphore lease is lost (#42534) This PR fixes an edge case where the semaphore lock can be lost due to a backend error and the routine can continue running. Signed-off-by: Tiago Silva --- lib/srv/discovery/access_graph.go | 26 ++++++++++++++++++++------ lib/srv/discovery/access_graph_test.go | 2 +- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/lib/srv/discovery/access_graph.go b/lib/srv/discovery/access_graph.go index 80ef511b56e66..3fb339e346dfc 100644 --- a/lib/srv/discovery/access_graph.go +++ b/lib/srv/discovery/access_graph.go @@ -23,6 +23,7 @@ import ( "crypto/tls" "crypto/x509" "errors" + "sync" "time" "github.com/gravitational/trace" @@ -45,10 +46,8 @@ const ( batchSize = 500 ) -var ( - // errNoAccessGraphFetchers is returned when there are no TAG fetchers. - errNoAccessGraphFetchers = errors.New("no Access Graph fetchers") -) +// errNoAccessGraphFetchers is returned when there are no TAG fetchers. +var errNoAccessGraphFetchers = errors.New("no Access Graph fetchers") func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *aws_sync.Resources, stream accessgraphv1alpha.AccessGraphService_AWSEventsStreamClient, features aws_sync.Features) error { type fetcherResult struct { @@ -269,6 +268,21 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c if err != nil { return trace.Wrap(err) } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + return + case <-lease.Done(): + cancel() + } + }() defer func() { lease.Stop() if err := lease.Wait(); err != nil { @@ -310,12 +324,12 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c } features := aws_sync.BuildFeatures(supportedKinds...) - ctx, cancel := context.WithCancel(ctx) - defer cancel() // Start a goroutine to watch the access graph service connection state. // If the connection is closed, cancel the context to stop the event watcher // before it tries to send any events to the access graph service. + wg.Add(1) go func() { + defer wg.Done() defer cancel() if !accessGraphConn.WaitForStateChange(ctx, connectivity.Ready) { s.Log.Info("access graph service connection was closed") diff --git a/lib/srv/discovery/access_graph_test.go b/lib/srv/discovery/access_graph_test.go index ad8af07f5986c..b7ab66960dce0 100644 --- a/lib/srv/discovery/access_graph_test.go +++ b/lib/srv/discovery/access_graph_test.go @@ -31,7 +31,7 @@ import ( ) func TestServer_updateDiscoveryConfigStatus(t *testing.T) { - var testErr = "test error" + testErr := "test error" clock := clockwork.NewFakeClock() type args struct { fetchers []aws_sync.AWSSync