Skip to content

Commit

Permalink
Fix goroutine leaks in plugin/sampling/strategystore/adaptive (jaeger…
Browse files Browse the repository at this point in the history
…tracing#5310)

## Which problem is this PR solving?
- Solves part of jaegertracing#5006

## Description of the changes
- This mainly involved ensuring that all goroutines started by the
Processor are shut down in a Close method (which also blocks on them
returning via a WaitGroup).
- Adding this flagged an issue where the `runUpdateProbabilitiesLoop`
had a long delay, so tests need to be able to override the default
Processor.followerRefreshInterval, or `Close` would take up to ~20s to
return. More context on this here
jaegertracing#5006 (comment)
- specifically regarding how to override this in the `Factory`.
- I also needed to fix a deadlock where a test was not unlocking a lock
which would allow the Close method to return:
https://github.com/jaegertracing/jaeger/pull/5310/files#diff-c9d6c33e36122502911585c65618d1af863079a70a68543adcc0ae2798faa348R468

## How was this change tested?
- `make test lint`

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Will Sewell <willsewell@monzo.com>
  • Loading branch information
Will Sewell authored Mar 29, 2024
1 parent f934846 commit bae96f7
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 15 deletions.
4 changes: 4 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName s
return &api_v2.SamplingStrategyResponse{}, nil
}

func (m *mockStrategyStore) Close() error {
return nil
}

func TestCollector_PublishOpts(t *testing.T) {
// prepare
hc := healthcheck.New()
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/sampling/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func (s mockSamplingStore) Close() error {
return nil
}

func TestNewGRPCHandler(t *testing.T) {
tests := []struct {
req *api_v2.SamplingStrategyParameters
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (

// StrategyStore keeps track of service specific sampling strategies.
type StrategyStore interface {
// Close() from io.Closer stops the processor from calculating probabilities.
io.Closer

// GetSamplingStrategy retrieves the sampling strategy for the specified service.
GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error)
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName st
return nil, nil
}

func (s mockSamplingStore) Close() error {
return nil
}

type mockSpanProcessor struct{}

func (p *mockSpanProcessor) Close() error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/clientcfg/clientcfghttp/cfgmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (m *mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName s
return m.samplingResponse, nil
}

func (m *mockSamplingStore) Close() error {
return nil
}

type mockBaggageMgr struct {
baggageResponse []*baggage.BaggageRestriction
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ func TestFactory(t *testing.T) {
assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval)

require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop()))
_, _, err := f.CreateStrategyStore()
store, aggregator, err := f.CreateStrategyStore()
require.NoError(t, err)
require.NoError(t, store.Close())
require.NoError(t, aggregator.Close())
}

func TestBadConfigFail(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions plugin/sampling/strategystore/adaptive/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adaptive

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
40 changes: 27 additions & 13 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ type Processor struct {

serviceCache []SamplingCache

shutdown chan struct{}
shutdown chan struct{}
bgFinished sync.WaitGroup

operationsCalculatedGauge metrics.Gauge
calculateProbabilitiesLatency metrics.Timer
Expand Down Expand Up @@ -170,19 +171,28 @@ func (p *Processor) Start() error {
p.shutdown = make(chan struct{})
p.loadProbabilities()
p.generateStrategyResponses()
go p.runCalculationLoop()
go p.runUpdateProbabilitiesLoop()
p.runBackground(p.runCalculationLoop)
p.runBackground(p.runUpdateProbabilitiesLoop)
return nil
}

func (p *Processor) runBackground(f func()) {
p.bgFinished.Add(1)
go func() {
f()
p.bgFinished.Done()
}()
}

// Close stops the processor from calculating probabilities.
func (p *Processor) Close() error {
p.logger.Info("stopping adaptive sampling processor")
if err := p.electionParticipant.Close(); err != nil {
return err
err := p.electionParticipant.Close()
if p.shutdown != nil {
close(p.shutdown)
}
close(p.shutdown)
return nil
p.bgFinished.Wait()
return err
}

func (p *Processor) loadProbabilities() {
Expand All @@ -200,7 +210,12 @@ func (p *Processor) loadProbabilities() {
// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage.
// The follower updates its local cache with the latest probabilities and serves them.
func (p *Processor) runUpdateProbabilitiesLoop() {
addJitter(p.followerRefreshInterval)
select {
case <-time.After(addJitter(p.followerRefreshInterval)):
case <-p.shutdown:
return
}

ticker := time.NewTicker(p.followerRefreshInterval)
defer ticker.Stop()
for {
Expand All @@ -221,13 +236,12 @@ func (p *Processor) isLeader() bool {
return p.electionParticipant.IsLeader()
}

// addJitter sleeps for a random amount of time. Without jitter, if the host holding the leader
// addJitter adds a random amount of time. Without jitter, if the host holding the leader
// lock were to die, then all other collectors can potentially wait for a full cycle before
// trying to acquire the lock. With jitter, we can reduce the average amount of time before a
// new leader is elected. Furthermore, jitter can be used to spread out read load on storage.
func addJitter(jitterAmount time.Duration) {
delay := (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2)))
time.Sleep(delay)
func addJitter(jitterAmount time.Duration) time.Duration {
return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2)))
}

func (p *Processor) runCalculationLoop() {
Expand Down Expand Up @@ -272,7 +286,7 @@ func (p *Processor) runCalculationLoop() {
// be way longer than the time to run the calculations.
p.generateStrategyResponses()
p.calculateProbabilitiesLatency.Record(time.Since(startTime))
go p.saveProbabilitiesAndQPS()
p.runBackground(p.saveProbabilitiesAndQPS)
}
case <-p.shutdown:
return
Expand Down
2 changes: 2 additions & 0 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) {
p.RLock()
assert.NotNil(t, p.probabilities)
assert.NotNil(t, p.strategyResponses)
p.RUnlock()
}

func TestRealisticRunCalculationLoop(t *testing.T) {
Expand Down Expand Up @@ -880,6 +881,7 @@ func TestErrors(t *testing.T) {
p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
require.Error(t, p.Start())
require.Error(t, p.Close())

// close errors
mockEP = &epmocks.ElectionParticipant{}
Expand Down
3 changes: 2 additions & 1 deletion plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ func (h *strategyStore) GetSamplingStrategy(_ context.Context, serviceName strin
}

// Close stops updating the strategies
func (h *strategyStore) Close() {
func (h *strategyStore) Close() error {
h.cancelFunc()
return nil
}

func (h *strategyStore) downloadSamplingStrategies(url string) ([]byte, error) {
Expand Down

0 comments on commit bae96f7

Please sign in to comment.