Skip to content

Commit

Permalink
Fix memqueue getting stuck on shutdown (#37077)
Browse files Browse the repository at this point in the history
There was a case when openState.publish could get stuck and ignore its
shutdown signal, effectively preventing a Filebeat (or any Beat) from
gracefully terminating.

This commit fixes this by ensuring every channel read/write also
checks for the shutdown signal.
  • Loading branch information
belimawr authored Nov 13, 2023
1 parent 9b932c4 commit a473880
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Upgraded apache arrow library used in x-pack/libbeat/reader/parquet from v11 to v12.0.1 in order to fix cross-compilation issues {pull}35640[35640]
- Fix panic when MaxRetryInterval is specified, but RetryInterval is not {pull}35820[35820]
- Support build of projects outside of beats directory {pull}36126[36126]
- Fix memqueue producer blocking indefinitely even after being cancelled {issue}22813[22813] {pull}37077[37077]

*Auditbeat*

Expand Down
12 changes: 11 additions & 1 deletion libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,17 @@ func (st *openState) Close() {
func (st *openState) publish(req pushRequest) (queue.EntryID, bool) {
select {
case st.events <- req:
return <-req.resp, true
// If the output is blocked and the queue is full, `req` is written
// to `st.events`, however the queue never writes back to `req.resp`,
// which effectively blocks for ever. So we also need to select on the
// done channel to ensure we don't miss the shutdown signal.
select {
case resp := <-req.resp:
return resp, true
case <-st.done:
st.events = nil
return 0, false
}
case <-st.done:
st.events = nil
return 0, false
Expand Down
75 changes: 75 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"math"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

"gotest.tools/assert"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -74,6 +77,78 @@ func TestProduceConsumer(t *testing.T) {
t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond)))
}

// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish
// does not block indefinitely.
//
// Once we get a producer `p` from the queue we want to ensure
// that if p.Publish is called and blocks it will unblock once
// p.Cancel is called.
//
// For this test we start a queue with size 2 and try to add more
// than 2 events to it, p.Publish will block, once we call p.Cancel,
// we ensure the 3rd event was not successfully published.
func TestProducerDoesNotBlockWhenCancelled(t *testing.T) {
q := NewQueue(nil, nil,
Settings{
Events: 2, // Queue size
FlushMinEvents: 1, // make sure the queue won't buffer events
FlushTimeout: time.Millisecond,
}, 0)

p := q.Producer(queue.ProducerConfig{
// We do not read from the queue, so the callbacks are never called
ACK: func(count int) {},
OnDrop: func(e interface{}) {},
DropOnCancel: false,
})

success := atomic.Bool{}
publishCount := atomic.Int32{}
go func() {
// Publish 2 events, this will make the queue full, but
// both will be accepted
for i := 0; i < 2; i++ {
id, ok := p.Publish(fmt.Sprintf("Event %d", i))
if !ok {
t.Errorf("failed to publish to the queue, event ID: %v", id)
return
}
publishCount.Add(1)
}
_, ok := p.Publish("Event 3")
if ok {
t.Errorf("publishing the 3rd event must fail")
return
}

// Flag the test as successful
success.Store(true)
}()

// Allow the producer to run and the queue to do its thing.
// Two events should be accepted and the third call to p.Publish
// must block
// time.Sleep(100 * time.Millisecond)

// Ensure we published two events
require.Eventually(
t,
func() bool { return publishCount.Load() == 2 },
200*time.Millisecond,
time.Millisecond,
"the first two events were not successfully published")

// Cancel the producer, this should unblock its Publish method
p.Cancel()

require.Eventually(
t,
success.Load,
200*time.Millisecond,
1*time.Millisecond,
"test not flagged as successful, p.Publish likely blocked indefinitely")
}

func TestQueueMetricsDirect(t *testing.T) {
eventsToTest := 5
maxEvents := 10
Expand Down

0 comments on commit a473880

Please sign in to comment.