Skip to content

Commit

Permalink
feat: introduce ack trackers
Browse files Browse the repository at this point in the history
  • Loading branch information
KennyChenFight committed Aug 30, 2024
1 parent 953d9ea commit 71abd2e
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
38 changes: 38 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type partitionConsumer struct {
chunkedMsgCtxMap *chunkedMsgCtxMap
unAckChunksTracker *unAckChunksTracker
ackGroupingTracker ackGroupingTracker
ackTrackers *ackTrackers

lastMessageInBroker *trackingMessageID

Expand Down Expand Up @@ -375,6 +376,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
pc.decryptor = decryptor

pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
pc.ackTrackers = newAckTrackers()

err := pc.grabConn("")
if err != nil {
Expand Down Expand Up @@ -443,6 +445,9 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
}

trackingID := toTrackingMessageID(msgID)
if trackingID != nil && trackingID.tracker == nil {
trackingID.tracker = pc.ackTrackers.tracker(trackingID)
}

if trackingID != nil && trackingID.ack() {
// All messages in the same batch have been acknowledged, we only need to acknowledge the
Expand Down Expand Up @@ -712,6 +717,9 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon
if trackingID == nil {
return errors.New("failed to convert trackingMessageID")
}
if trackingID.tracker == nil {
trackingID.tracker = pc.ackTrackers.tracker(trackingID)
}

var msgIDToAck *trackingMessageID
if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
Expand Down Expand Up @@ -1162,6 +1170,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
ackTracker)
// set the consumer so we know how to ack the message id
trackingMsgID.consumer = pc
pc.ackTrackers.add(trackingMsgID, ackTracker)

if pc.messageShouldBeDiscarded(trackingMsgID) {
pc.AckID(trackingMsgID)
Expand Down Expand Up @@ -2366,3 +2375,32 @@ func (u *unAckChunksTracker) nack(cmid *chunkMessageID) {
}
u.remove(cmid)
}

type ackTrackers struct {
mu sync.RWMutex
trackers map[[2]int64]*ackTracker
}

func newAckTrackers() *ackTrackers {
return &ackTrackers{
trackers: make(map[[2]int64]*ackTracker),
}
}

func (a *ackTrackers) tracker(id MessageID) *ackTracker {
a.mu.RLock()
defer a.mu.RUnlock()
return a.trackers[[2]int64{id.LedgerID(), id.EntryID()}]
}

func (a *ackTrackers) add(id MessageID, tracker *ackTracker) {
a.mu.Lock()
defer a.mu.Unlock()
a.trackers[[2]int64{id.LedgerID(), id.EntryID()}] = tracker
}

func (a *ackTrackers) remove(id MessageID) {
a.mu.Lock()
defer a.mu.Unlock()
delete(a.trackers, [2]int64{id.LedgerID(), id.EntryID()})
}
64 changes: 64 additions & 0 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
options: &partitionConsumerOpts{},
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
ackTrackers: newAckTrackers(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
Expand Down Expand Up @@ -75,6 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
options: &partitionConsumerOpts{},
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
ackTrackers: newAckTrackers(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
Expand Down Expand Up @@ -111,6 +113,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
options: &partitionConsumerOpts{},
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
ackTrackers: newAckTrackers(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
Expand Down Expand Up @@ -150,6 +153,67 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}
}

func TestBatchMessageIDWithAckTrackers(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
ackTrackers: newAckTrackers(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
t.Fatal(err)
}

// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
}

noAckTrackerMessages := make([]MessageID, 10)
for i, m := range messages {
tmp := m.ID().Serialize()
mid, err := DeserializeMessageID(tmp)
if err != nil {
t.Fatal(err)
}
noAckTrackerMessages[i] = mid
}

// ack all message ids except the last one
for i := 0; i < 9; i++ {
_, ok := noAckTrackerMessages[i].(*trackingMessageID)
assert.False(t, ok)
err := pc.AckID(noAckTrackerMessages[i])
assert.Nil(t, err)
}

select {
case <-eventsCh:
t.Error("The message id should not be acked!")
default:
}

// ack last message
err := pc.AckID(noAckTrackerMessages[9])
assert.Nil(t, err)

select {
case <-eventsCh:
default:
t.Error("Expected an ack request to be triggered!")
}
}

// Raw single message in old format
// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
// payload = "hello"
Expand Down

0 comments on commit 71abd2e

Please sign in to comment.