Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor internal buffer chain in the memory queue #37795

Merged
merged 15 commits into from
Feb 13, 2024
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func TestClient(t *testing.T) {

// a small in-memory queue with a very short flush interval
q := memqueue.NewQueue(l, nil, memqueue.Settings{
Events: 5,
FlushMinEvents: 1,
FlushTimeout: time.Millisecond,
Events: 5,
MaxGetRequest: 1,
FlushTimeout: time.Millisecond,
}, 5)

// model a processor that we're going to make produce errors after
Expand Down
89 changes: 66 additions & 23 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,28 @@ package memqueue
type ackLoop struct {
broker *broker

// A list of ACK channels given to queue consumers,
// A list of batches given to queue consumers,
// used to maintain sequencing of event acknowledgements.
ackChans chanList
pendingBatches batchList
}

processACK func(chanList, int)
func newACKLoop(broker *broker) *ackLoop {
return &ackLoop{broker: broker}
}

func (l *ackLoop) run() {
b := l.broker
for {
nextBatchChan := l.ackChans.nextBatchChannel()
nextBatchChan := l.pendingBatches.nextBatchChannel()

select {
case <-l.broker.done:
case <-b.done:
// The queue is shutting down.
return

case chanList := <-l.broker.scheduledACKs:
// A new batch has been generated, add its ACK channel to the end of
// the pending list.
l.ackChans.concat(&chanList)
case chanList := <-b.consumedChan:
// New batches have been generated, add them to the pending list
l.pendingBatches.concat(&chanList)

case <-nextBatchChan:
// The oldest outstanding batch has been acknowledged, advance our
Expand All @@ -57,11 +59,11 @@ func (l *ackLoop) run() {
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {
lst := l.collectAcked()
ackedBatches := l.collectAcked()

count := 0
for current := lst.front(); current != nil; current = current.next {
count += current.count
for batch := ackedBatches.front(); batch != nil; batch = batch.next {
count += batch.count
}

if count > 0 {
Expand All @@ -70,11 +72,12 @@ func (l *ackLoop) handleBatchSig() int {
}

// report acks to waiting clients
l.processACK(lst, count)
l.processACK(ackedBatches, count)
}

for !lst.empty() {
releaseACKChan(lst.pop())
for !ackedBatches.empty() {
// Release finished batch structs into the shared memory pool
releaseBatch(ackedBatches.pop())
}

// return final ACK to EventLoop, in order to clean up internal buffer
Expand All @@ -84,23 +87,63 @@ func (l *ackLoop) handleBatchSig() int {
return count
}

func (l *ackLoop) collectAcked() chanList {
lst := chanList{}
func (l *ackLoop) collectAcked() batchList {
ackedBatches := batchList{}

acks := l.ackChans.pop()
lst.append(acks)
acks := l.pendingBatches.pop()
ackedBatches.append(acks)

done := false
for !l.ackChans.empty() && !done {
acks := l.ackChans.front()
for !l.pendingBatches.empty() && !done {
acks := l.pendingBatches.front()
select {
case <-acks.doneChan:
lst.append(l.ackChans.pop())
ackedBatches.append(l.pendingBatches.pop())

default:
done = true
}
}

return lst
return ackedBatches
}

// Called by ackLoop. This function exists to decouple the work of collecting
// and running producer callbacks from logical deletion of the events, so
// input callbacks can't block the queue by occupying the runLoop goroutine.
func (l *ackLoop) processACK(lst batchList, N int) {
ackCallbacks := []func(){}
// First we traverse the entries we're about to remove, collecting any callbacks
// we need to run.
lst.reverse()
for !lst.empty() {
batch := lst.pop()

// Traverse entries from last to first, so we can acknowledge the most recent
// ones first and skip subsequent producer callbacks.
for i := batch.count - 1; i >= 0; i-- {
entry := batch.rawEntry(i)
if entry.producer == nil {
continue
}

if entry.producerID <= entry.producer.state.lastACK {
// This index was already acknowledged on a previous iteration, skip.
entry.producer = nil
continue
}
producerState := entry.producer.state
count := int(entry.producerID - producerState.lastACK)
ackCallbacks = append(ackCallbacks, func() { producerState.cb(count) })
entry.producer.state.lastACK = entry.producerID
entry.producer = nil
}
}
// Signal runLoop to delete the events
l.broker.deleteChan <- N

// The events have been removed; notify their listeners.
for _, f := range ackCallbacks {
f()
}
}
53 changes: 0 additions & 53 deletions libbeat/publisher/queue/memqueue/batchbuf.go

This file was deleted.

Loading
Loading