From bae96f7c0f1177978a9eb47fd43d1ba403d945e6 Mon Sep 17 00:00:00 2001 From: Will Sewell Date: Fri, 29 Mar 2024 20:25:40 +0000 Subject: [PATCH] Fix goroutine leaks in plugin/sampling/strategystore/adaptive (#5310) ## Which problem is this PR solving? - Solves part of https://github.com/jaegertracing/jaeger/issues/5006 ## Description of the changes - This mainly involved ensuring that all goroutines started by the Processor are shut down in a Close method (which also blocks on them returning via a WaitGroup). - Adding this flagged an issue where the `runUpdateProbabilitiesLoop` had a long delay, so tests need to be able to override the default Processor.followerRefreshInterval, or `Close` would take up to ~20s to return. More context on this here https://github.com/jaegertracing/jaeger/issues/5006#issuecomment-2027292668 - specifically regarding how to override this in the `Factory`. - I also needed to fix a deadlock where a test was not unlocking a lock which would allow the Close method to return: https://github.com/jaegertracing/jaeger/pull/5310/files#diff-c9d6c33e36122502911585c65618d1af863079a70a68543adcc0ae2798faa348R468 ## How was this change tested? - `make test lint` ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [ ] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Will Sewell --- cmd/collector/app/collector_test.go | 4 ++ .../app/sampling/grpc_handler_test.go | 4 ++ .../app/sampling/strategystore/interface.go | 3 ++ cmd/collector/app/server/test.go | 4 ++ pkg/clientcfg/clientcfghttp/cfgmgr_test.go | 4 ++ .../strategystore/adaptive/factory_test.go | 4 +- .../strategystore/adaptive/package_test.go | 14 +++++++ .../strategystore/adaptive/processor.go | 40 +++++++++++++------ .../strategystore/adaptive/processor_test.go | 2 + .../strategystore/static/strategy_store.go | 3 +- 10 files changed, 67 insertions(+), 15 deletions(-) create mode 100644 plugin/sampling/strategystore/adaptive/package_test.go diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 6e08ce76a19..8c92cd81e53 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -129,6 +129,10 @@ func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName s return &api_v2.SamplingStrategyResponse{}, nil } +func (m *mockStrategyStore) Close() error { + return nil +} + func TestCollector_PublishOpts(t *testing.T) { // prepare hc := healthcheck.New() diff --git a/cmd/collector/app/sampling/grpc_handler_test.go b/cmd/collector/app/sampling/grpc_handler_test.go index 3a6590bf1ad..439e3bef646 100644 --- a/cmd/collector/app/sampling/grpc_handler_test.go +++ b/cmd/collector/app/sampling/grpc_handler_test.go @@ -37,6 +37,10 @@ func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil } +func (s mockSamplingStore) Close() error { + return nil +} + func TestNewGRPCHandler(t *testing.T) { tests := []struct { req *api_v2.SamplingStrategyParameters diff --git a/cmd/collector/app/sampling/strategystore/interface.go b/cmd/collector/app/sampling/strategystore/interface.go index 8ee99491110..90d9464918d 100644 --- a/cmd/collector/app/sampling/strategystore/interface.go +++ b/cmd/collector/app/sampling/strategystore/interface.go @@ -24,6 +24,9 @@ import ( // StrategyStore keeps track of service specific sampling strategies. type StrategyStore interface { + // Close() from io.Closer stops the processor from calculating probabilities. + io.Closer + // GetSamplingStrategy retrieves the sampling strategy for the specified service. GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) } diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go index 4f78e75866c..b670934a785 100644 --- a/cmd/collector/app/server/test.go +++ b/cmd/collector/app/server/test.go @@ -28,6 +28,10 @@ func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName st return nil, nil } +func (s mockSamplingStore) Close() error { + return nil +} + type mockSpanProcessor struct{} func (p *mockSpanProcessor) Close() error { diff --git a/pkg/clientcfg/clientcfghttp/cfgmgr_test.go b/pkg/clientcfg/clientcfghttp/cfgmgr_test.go index f6da22f7909..7c24bbeda24 100644 --- a/pkg/clientcfg/clientcfghttp/cfgmgr_test.go +++ b/pkg/clientcfg/clientcfghttp/cfgmgr_test.go @@ -37,6 +37,10 @@ func (m *mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName s return m.samplingResponse, nil } +func (m *mockSamplingStore) Close() error { + return nil +} + type mockBaggageMgr struct { baggageResponse []*baggage.BaggageRestriction } diff --git a/plugin/sampling/strategystore/adaptive/factory_test.go b/plugin/sampling/strategystore/adaptive/factory_test.go index 71a7bf0bb60..99c02b8c1f2 100644 --- a/plugin/sampling/strategystore/adaptive/factory_test.go +++ b/plugin/sampling/strategystore/adaptive/factory_test.go @@ -72,8 +72,10 @@ func TestFactory(t *testing.T) { assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval) require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop())) - _, _, err := f.CreateStrategyStore() + store, aggregator, err := f.CreateStrategyStore() require.NoError(t, err) + require.NoError(t, store.Close()) + require.NoError(t, aggregator.Close()) } func TestBadConfigFail(t *testing.T) { diff --git a/plugin/sampling/strategystore/adaptive/package_test.go b/plugin/sampling/strategystore/adaptive/package_test.go new file mode 100644 index 00000000000..64db46d18bc --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptive + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/plugin/sampling/strategystore/adaptive/processor.go b/plugin/sampling/strategystore/adaptive/processor.go index a87e98f8fc9..7ac5c1834fa 100644 --- a/plugin/sampling/strategystore/adaptive/processor.go +++ b/plugin/sampling/strategystore/adaptive/processor.go @@ -110,7 +110,8 @@ type Processor struct { serviceCache []SamplingCache - shutdown chan struct{} + shutdown chan struct{} + bgFinished sync.WaitGroup operationsCalculatedGauge metrics.Gauge calculateProbabilitiesLatency metrics.Timer @@ -170,19 +171,28 @@ func (p *Processor) Start() error { p.shutdown = make(chan struct{}) p.loadProbabilities() p.generateStrategyResponses() - go p.runCalculationLoop() - go p.runUpdateProbabilitiesLoop() + p.runBackground(p.runCalculationLoop) + p.runBackground(p.runUpdateProbabilitiesLoop) return nil } +func (p *Processor) runBackground(f func()) { + p.bgFinished.Add(1) + go func() { + f() + p.bgFinished.Done() + }() +} + // Close stops the processor from calculating probabilities. func (p *Processor) Close() error { p.logger.Info("stopping adaptive sampling processor") - if err := p.electionParticipant.Close(); err != nil { - return err + err := p.electionParticipant.Close() + if p.shutdown != nil { + close(p.shutdown) } - close(p.shutdown) - return nil + p.bgFinished.Wait() + return err } func (p *Processor) loadProbabilities() { @@ -200,7 +210,12 @@ func (p *Processor) loadProbabilities() { // runUpdateProbabilitiesLoop is a loop that reads probabilities from storage. // The follower updates its local cache with the latest probabilities and serves them. func (p *Processor) runUpdateProbabilitiesLoop() { - addJitter(p.followerRefreshInterval) + select { + case <-time.After(addJitter(p.followerRefreshInterval)): + case <-p.shutdown: + return + } + ticker := time.NewTicker(p.followerRefreshInterval) defer ticker.Stop() for { @@ -221,13 +236,12 @@ func (p *Processor) isLeader() bool { return p.electionParticipant.IsLeader() } -// addJitter sleeps for a random amount of time. Without jitter, if the host holding the leader +// addJitter adds a random amount of time. Without jitter, if the host holding the leader // lock were to die, then all other collectors can potentially wait for a full cycle before // trying to acquire the lock. With jitter, we can reduce the average amount of time before a // new leader is elected. Furthermore, jitter can be used to spread out read load on storage. -func addJitter(jitterAmount time.Duration) { - delay := (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2))) - time.Sleep(delay) +func addJitter(jitterAmount time.Duration) time.Duration { + return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2))) } func (p *Processor) runCalculationLoop() { @@ -272,7 +286,7 @@ func (p *Processor) runCalculationLoop() { // be way longer than the time to run the calculations. p.generateStrategyResponses() p.calculateProbabilitiesLatency.Record(time.Since(startTime)) - go p.saveProbabilitiesAndQPS() + p.runBackground(p.saveProbabilitiesAndQPS) } case <-p.shutdown: return diff --git a/plugin/sampling/strategystore/adaptive/processor_test.go b/plugin/sampling/strategystore/adaptive/processor_test.go index c63ea246a71..03ae6b7b7bb 100644 --- a/plugin/sampling/strategystore/adaptive/processor_test.go +++ b/plugin/sampling/strategystore/adaptive/processor_test.go @@ -449,6 +449,7 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) { p.RLock() assert.NotNil(t, p.probabilities) assert.NotNil(t, p.strategyResponses) + p.RUnlock() } func TestRealisticRunCalculationLoop(t *testing.T) { @@ -880,6 +881,7 @@ func TestErrors(t *testing.T) { p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) require.Error(t, p.Start()) + require.Error(t, p.Close()) // close errors mockEP = &epmocks.ElectionParticipant{} diff --git a/plugin/sampling/strategystore/static/strategy_store.go b/plugin/sampling/strategystore/static/strategy_store.go index cf8544630e9..feb6d134197 100644 --- a/plugin/sampling/strategystore/static/strategy_store.go +++ b/plugin/sampling/strategystore/static/strategy_store.go @@ -91,8 +91,9 @@ func (h *strategyStore) GetSamplingStrategy(_ context.Context, serviceName strin } // Close stops updating the strategies -func (h *strategyStore) Close() { +func (h *strategyStore) Close() error { h.cancelFunc() + return nil } func (h *strategyStore) downloadSamplingStrategies(url string) ([]byte, error) {