Skip to content

Commit

Permalink
NackBackoffPolicy.Next return time.Duration (#834)
Browse files Browse the repository at this point in the history
Co-authored-by: tevinhuang <tevinhuang@tencent.com>
  • Loading branch information
h-hy and tevinhuang authored Sep 14, 2022
1 parent b06e198 commit edd5c71
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 16 deletions.
4 changes: 2 additions & 2 deletions pulsar/negative_acks_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,

if nackBackoffPolicy != nil {
firstDelayForNackBackoff := nackBackoffPolicy.Next(1)
t.delay = time.Duration(firstDelayForNackBackoff)
t.delay = firstDelayForNackBackoff
} else {
t.delay = delay
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
return
}

targetTime := time.Now().Add(time.Duration(nackBackoffDelay))
targetTime := time.Now().Add(nackBackoffDelay)
t.negativeAcks[batchMsgID] = targetTime
}

Expand Down
14 changes: 9 additions & 5 deletions pulsar/negative_acks_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ type nackMockedConsumer struct {
lock sync.Mutex
}

func newNackMockedConsumer() *nackMockedConsumer {
func newNackMockedConsumer(nackBackoffPolicy NackBackoffPolicy) *nackMockedConsumer {
t := &nackMockedConsumer{
ch: make(chan messageID, 10),
}
go func() {
// since the client ticks at an interval of delay / 3
// wait another interval to ensure we get all messages
time.Sleep(testNackDelay + 101*time.Millisecond)
if nackBackoffPolicy == nil {
time.Sleep(testNackDelay + 101*time.Millisecond)
} else {
time.Sleep(nackBackoffPolicy.Next(1) + 101*time.Millisecond)
}
t.lock.Lock()
defer t.lock.Unlock()
t.closed = true
Expand Down Expand Up @@ -74,7 +78,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
}

func TestNacksTracker(t *testing.T) {
nmc := newNackMockedConsumer()
nmc := newNackMockedConsumer(nil)
nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger())

nacks.Add(messageID{
Expand Down Expand Up @@ -107,7 +111,7 @@ func TestNacksTracker(t *testing.T) {
}

func TestNacksWithBatchesTracker(t *testing.T) {
nmc := newNackMockedConsumer()
nmc := newNackMockedConsumer(nil)
nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger())

nacks.Add(messageID{
Expand Down Expand Up @@ -150,7 +154,7 @@ func TestNacksWithBatchesTracker(t *testing.T) {
}

func TestNackBackoffTracker(t *testing.T) {
nmc := newNackMockedConsumer()
nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy))
nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger())

nacks.AddMessage(new(mockMessage1))
Expand Down
17 changes: 10 additions & 7 deletions pulsar/negative_backoff_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package pulsar

import "math"
import (
"math"
"time"
)

// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
// for a consumer.
Expand All @@ -28,19 +31,19 @@ import "math"
type NackBackoffPolicy interface {
// The redeliveryCount indicates the number of times the message was redelivered.
// We can get the redeliveryCount from the CommandMessage.
Next(redeliveryCount uint32) int64
Next(redeliveryCount uint32) time.Duration
}

// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
type defaultNackBackoffPolicy struct{}

func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
minNackTimeMs := int64(1000 * 30) // 30sec
maxNackTimeMs := 1000 * 60 * 10 // 10min
func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) time.Duration {
minNackTime := 1 * time.Second // 1sec
maxNackTime := 10 * time.Minute // 10min

if redeliveryCount < 0 {
return minNackTimeMs
return minNackTime
}

return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))
return time.Duration(math.Min(math.Abs(float64(minNackTime<<redeliveryCount)), float64(maxNackTime)))
}
5 changes: 3 additions & 2 deletions pulsar/negative_backoff_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -27,8 +28,8 @@ func TestDefaultNackBackoffPolicy_Next(t *testing.T) {
defaultNackBackoff := new(defaultNackBackoffPolicy)

res0 := defaultNackBackoff.Next(0)
assert.Equal(t, int64(1000*30), res0)
assert.Equal(t, 1*time.Second, res0)

res5 := defaultNackBackoff.Next(5)
assert.Equal(t, int64(600000), res5)
assert.Equal(t, 32*time.Second, res5)
}

0 comments on commit edd5c71

Please sign in to comment.