Skip to content

Commit

Permalink
Retry producing on next partition if possible when a partition is not…
Browse files Browse the repository at this point in the history
… connected
  • Loading branch information
Prashant Kumar committed Nov 16, 2022
1 parent 0412f28 commit d96c2e2
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pulsar/default_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewDefaultRouter(
maxBatchingMessages uint,
maxBatchingSize uint,
maxBatchingDelay time.Duration,
disableBatching bool) func(*ProducerMessage, uint32) int {
shouldBatch func() bool) func(*ProducerMessage, uint32) int {
state := &defaultRouter{
currentPartitionCursor: rand.Uint32(),
lastBatchTimestamp: time.Now().UnixNano(),
Expand All @@ -65,7 +65,7 @@ func NewDefaultRouter(
}

// If there's no key, we do round-robin across partition. If no batching go to next partition.
if disableBatching {
if !shouldBatch() {
p := int(state.currentPartitionCursor % numPartitions)
atomic.AddUint32(&state.currentPartitionCursor, 1)
return p
Expand Down
6 changes: 5 additions & 1 deletion pulsar/default_router_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,9 @@ func newBenchDefaultRouter() func(*ProducerMessage, uint32) int {
maxBatchingSize = 524288
maxBatchingDelay = 100 * time.Millisecond
)
return NewDefaultRouter(internal.JavaStringHash, maxBatchingMessages, maxBatchingSize, maxBatchingDelay, false)
return NewDefaultRouter(internal.JavaStringHash,
maxBatchingMessages, maxBatchingSize,
maxBatchingDelay, func() bool {
return true
})
}
29 changes: 23 additions & 6 deletions pulsar/default_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
const oneHourPublishMaxDelay = time.Hour

func TestDefaultRouterRoutingBecauseBatchingDisabled(t *testing.T) {
router := NewDefaultRouter(internal.JavaStringHash, 20, 100, oneHourPublishMaxDelay, true)
router := NewDefaultRouter(internal.JavaStringHash, 20, 100, oneHourPublishMaxDelay, func() bool {
return false
})
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
Expand All @@ -47,7 +49,10 @@ func TestDefaultRouterRoutingBecauseBatchingDisabled(t *testing.T) {

func TestDefaultRouterRoutingBecauseMaxPublishDelayReached(t *testing.T) {
maxPublishDelay := time.Nanosecond * 10
router := NewDefaultRouter(internal.JavaStringHash, 10, 100, maxPublishDelay, false)
router := NewDefaultRouter(internal.JavaStringHash, 10, 100,
maxPublishDelay, func() bool {
return true
})
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
Expand All @@ -67,7 +72,9 @@ func TestDefaultRouterRoutingBecauseMaxPublishDelayReached(t *testing.T) {
}

func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
router := NewDefaultRouter(internal.JavaStringHash, 2, 100, oneHourPublishMaxDelay, false)
router := NewDefaultRouter(internal.JavaStringHash, 2, 100, oneHourPublishMaxDelay, func() bool {
return true
})
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
Expand All @@ -90,7 +97,11 @@ func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
}

func TestDefaultRouterRoutingBecauseMaxVolumeReached(t *testing.T) {
router := NewDefaultRouter(internal.JavaStringHash, 10, 10, oneHourPublishMaxDelay, false)
router := NewDefaultRouter(internal.JavaStringHash,
10, 10,
oneHourPublishMaxDelay, func() bool {
return true
})
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
Expand All @@ -108,7 +119,10 @@ func TestDefaultRouterRoutingBecauseMaxVolumeReached(t *testing.T) {
}

func TestDefaultRouterNoRoutingBecausePartitionKeyIsSpecified(t *testing.T) {
router := NewDefaultRouter(internal.JavaStringHash, 1, 1, 0, false)
router := NewDefaultRouter(internal.JavaStringHash, 1, 1,
0, func() bool {
return true
})
p1 := router(&ProducerMessage{
Key: "my-key",
Payload: []byte("message 1"),
Expand All @@ -124,7 +138,10 @@ func TestDefaultRouterNoRoutingBecausePartitionKeyIsSpecified(t *testing.T) {

func TestDefaultRouterNoRoutingBecauseOnlyOnePartition(t *testing.T) {

router := NewDefaultRouter(internal.JavaStringHash, 1, 10, oneHourPublishMaxDelay, false)
router := NewDefaultRouter(internal.JavaStringHash, 1, 10,
oneHourPublishMaxDelay, func() bool {
return true
})

// partition index should not change regardless of the batching settings
p1 := router(&ProducerMessage{
Expand Down
7 changes: 7 additions & 0 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ type ProducerOptions struct {
// Setting `DisableBatching: true` will make the producer to send messages individually
DisableBatching bool

// This config will ensure that If possible PartitionedProducer would attempt to produce message on
// another available partitions, If currently picked partition is not available for some reason.
// Next available partition will be chosen by the same routing policy as client is configured with.
// MaxRetryOtherPartitions How many partitions should be tried before bailing out and failing back to the
// old behaviour.
MaxRetryOtherPartitions int

// BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms)
// if batch messages are enabled. If set to a non zero value, messages will be queued until this time
// interval or until
Expand Down
47 changes: 45 additions & 2 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,16 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
options.BatchingMaxMessages,
options.BatchingMaxSize,
options.BatchingMaxPublishDelay,
options.DisableBatching)
func() bool {
if options.DisableBatching {
return false
}

if options.MaxRetryOtherPartitions > 0 && !p.isProducerConnected() {
return false
}
return true
})
p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
return internalRouter(message, metadata.NumPartitions())
}
Expand Down Expand Up @@ -160,6 +169,15 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
return p, nil
}

func (p *producer) isProducerConnected() bool {
for _, partitionedP := range p.producers {
if partitionedP.(*partitionProducer).getProducerState() != producerReady {
return false
}
}
return true
}

func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
Expand Down Expand Up @@ -306,6 +324,23 @@ func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
p.getPartition(msg).SendAsync(ctx, msg, callback)
}

func getNextConnectedPartition(p *producer, msg *ProducerMessage, startPartition int, maxRetry int) int {
if maxRetry == 0 {
return startPartition
}
partition := p.messageRouter(msg, p)
if partition == startPartition {
return partition
}
producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
producerForPartition := producers[partition].(*partitionProducer)
if producerForPartition.getProducerState() == producerReady {
return partition
}
maxRetry--
return getNextConnectedPartition(p, msg, partition, maxRetry)
}

func (p *producer) getPartition(msg *ProducerMessage) Producer {
// Since partitions can only increase, it's ok if the producers list
// is updated in between. The numPartition is updated only after the list.
Expand All @@ -316,7 +351,15 @@ func (p *producer) getPartition(msg *ProducerMessage) Producer {
// updated
partition %= len(producers)
}
return producers[partition]
producerForPartition := producers[partition].(*partitionProducer)
if producerForPartition.getProducerState() != producerReady {
nextPartition := getNextConnectedPartition(p, msg, partition, 5)
if nextPartition != partition {
return producers[nextPartition].(*partitionProducer)
}
}

return producerForPartition
}

func (p *producer) LastSequenceID() int64 {
Expand Down
62 changes: 62 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,68 @@ func TestFlushInPartitionedProducer(t *testing.T) {
assert.Equal(t, msgCount, numOfMessages/2)
}

func TestRoundRobinRouterPartitionedProducerProduceOnNextTopicOnFailure(t *testing.T) {
topicName := "public/default/partition-ProduceOnNextTopicOnFailure" + strconv.FormatInt(time.Now().Unix(), 10)
numberOfPartitions := 10

// call admin api to make it partitioned
url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions"
makeHTTPCall(t, http.MethodPut, url, strconv.Itoa(numberOfPartitions))

numOfMessages := 100
ctx := context.Background()

// creat client connection
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()

// create pro
pro, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
BatchingMaxMessages: uint(5),
MaxRetryOtherPartitions: 5,
})
p := pro.(*producer).producers[0]
firstProducer := p.(*partitionProducer)
firstProducer.Close()
assert.NotEqual(t, firstProducer.getProducerState(), producerReady)
defer pro.Close()

prefix := "msg-"
for i := 0; i < numOfMessages; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
_, err = pro.Send(ctx, &ProducerMessage{
Payload: []byte(messageContent),
})
assert.Nil(t, err)
}
mChan := make(chan Message, numOfMessages)
for i := 0; i < numOfMessages; i++ {
go func() {
msg, _ := consumer.Receive(ctx)
assert.NotEqual(t, msg.Topic(), "persistent://"+topicName+"-partition-0")
mChan <- msg
}()
}

time.Sleep(1 * time.Second)
l := len(mChan)
assert.Equal(t, numOfMessages, l, "Number of messages received")
}

func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer"
numberOfPartitions := 5
Expand Down

0 comments on commit d96c2e2

Please sign in to comment.