Skip to content

Commit

Permalink
Fix a flaky unit test in loki.process (#1591)
Browse files Browse the repository at this point in the history
* Stop handleOut only after the pipeline shut down

* Rename variables
  • Loading branch information
ptodev authored Sep 2, 2024
1 parent 578269a commit 2fdacf7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 21 deletions.
23 changes: 15 additions & 8 deletions internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,26 @@ func New(o component.Options, args Arguments) (*Component, error) {

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
handleOutShutdown := make(chan struct{})
wgOut := &sync.WaitGroup{}
defer func() {
c.mut.RLock()
if c.entryHandler != nil {
c.entryHandler.Stop()
// Stop handleOut only after the entryHandler has stopped.
// If handleOut stops first, entryHandler might get stuck on a channel send.
close(handleOutShutdown)
wgOut.Wait()
}
c.mut.RUnlock()
}()
wg := &sync.WaitGroup{}
wg.Add(2)
go c.handleIn(ctx, wg)
go c.handleOut(ctx, wg)
wgIn := &sync.WaitGroup{}
wgIn.Add(1)
go c.handleIn(ctx, wgIn)
wgOut.Add(1)
go c.handleOut(handleOutShutdown, wgOut)

wg.Wait()
wgIn.Wait()
return nil
}

Expand Down Expand Up @@ -173,12 +180,12 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) {
}
}

func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) {
func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
componentID := livedebugging.ComponentID(c.opts.ID)
for {
select {
case <-ctx.Done():
case <-shutdownCh:
return
case entry := <-c.processOut:
c.fanoutMut.RLock()
Expand All @@ -193,7 +200,7 @@ func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) {

for _, f := range fanout {
select {
case <-ctx.Done():
case <-shutdownCh:
return
case f.Chan() <- entry:
}
Expand Down
31 changes: 18 additions & 13 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,11 @@ type testFrequentUpdate struct {
receiver2 loki.LogsReceiver

keepSending atomic.Bool
keepReceiving atomic.Bool
stopReceiving chan struct{}
keepUpdating atomic.Bool

wgLogSend sync.WaitGroup
wgSend sync.WaitGroup
wgReceive sync.WaitGroup
wgRun sync.WaitGroup
wgUpdate sync.WaitGroup

Expand All @@ -459,27 +460,30 @@ type testFrequentUpdate struct {

func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate {
res := testFrequentUpdate{
t: t,
receiver1: loki.NewLogsReceiver(),
receiver2: loki.NewLogsReceiver(),
t: t,
receiver1: loki.NewLogsReceiver(),
receiver2: loki.NewLogsReceiver(),
stopReceiving: make(chan struct{}),
}

ctx, cancel := context.WithCancel(context.Background())

res.keepSending.Store(true)
res.keepReceiving.Store(true)
res.keepUpdating.Store(true)

res.stop = func() {
res.keepUpdating.Store(false)
res.wgUpdate.Wait()

res.keepSending.Store(false)
res.wgLogSend.Wait()
res.wgSend.Wait()

cancel()
res.wgRun.Wait()

close(res.stopReceiving)
res.wgReceive.Wait()

close(res.receiver1.Chan())
close(res.receiver2.Chan())
}
Expand Down Expand Up @@ -515,23 +519,25 @@ func (r *testFrequentUpdate) drainLogs() {
r.lastSend.Store(time.Now())
}

r.wgLogSend.Add(1)
r.wgReceive.Add(1)
go func() {
for r.keepReceiving.Load() {
for {
select {
case <-r.stopReceiving:
r.wgReceive.Done()
return
case <-r.receiver1.Chan():
drainLogs()
case <-r.receiver2.Chan():
drainLogs()
}
}
r.wgLogSend.Done()
}()
}

// Continuously send entries to both channels
func (r *testFrequentUpdate) sendLogs() {
r.wgLogSend.Add(1)
r.wgSend.Add(1)
go func() {
for r.keepSending.Load() {
ts := time.Now()
Expand All @@ -548,8 +554,7 @@ func (r *testFrequentUpdate) sendLogs() {
// continue
}
}
r.keepReceiving.Store(false)
r.wgLogSend.Done()
r.wgSend.Done()
}()
}

Expand Down

0 comments on commit 2fdacf7

Please sign in to comment.