From d96c2e2e1a03ed497528247322b63be78d810ab5 Mon Sep 17 00:00:00 2001 From: Prashant Kumar Date: Thu, 3 Nov 2022 16:18:36 -0700 Subject: [PATCH] Retry producing on next partition if possible when a partition is not connected --- pulsar/default_router.go | 4 +- pulsar/default_router_bench_test.go | 6 ++- pulsar/default_router_test.go | 29 +++++++++++--- pulsar/producer.go | 7 ++++ pulsar/producer_impl.go | 47 +++++++++++++++++++++- pulsar/producer_test.go | 62 +++++++++++++++++++++++++++++ 6 files changed, 144 insertions(+), 11 deletions(-) diff --git a/pulsar/default_router.go b/pulsar/default_router.go index 6945ff195a..a1550d8255 100644 --- a/pulsar/default_router.go +++ b/pulsar/default_router.go @@ -41,7 +41,7 @@ func NewDefaultRouter( maxBatchingMessages uint, maxBatchingSize uint, maxBatchingDelay time.Duration, - disableBatching bool) func(*ProducerMessage, uint32) int { + shouldBatch func() bool) func(*ProducerMessage, uint32) int { state := &defaultRouter{ currentPartitionCursor: rand.Uint32(), lastBatchTimestamp: time.Now().UnixNano(), @@ -65,7 +65,7 @@ func NewDefaultRouter( } // If there's no key, we do round-robin across partition. If no batching go to next partition. - if disableBatching { + if !shouldBatch() { p := int(state.currentPartitionCursor % numPartitions) atomic.AddUint32(&state.currentPartitionCursor, 1) return p diff --git a/pulsar/default_router_bench_test.go b/pulsar/default_router_bench_test.go index d0c1cb9595..d3ddecba86 100644 --- a/pulsar/default_router_bench_test.go +++ b/pulsar/default_router_bench_test.go @@ -60,5 +60,9 @@ func newBenchDefaultRouter() func(*ProducerMessage, uint32) int { maxBatchingSize = 524288 maxBatchingDelay = 100 * time.Millisecond ) - return NewDefaultRouter(internal.JavaStringHash, maxBatchingMessages, maxBatchingSize, maxBatchingDelay, false) + return NewDefaultRouter(internal.JavaStringHash, + maxBatchingMessages, maxBatchingSize, + maxBatchingDelay, func() bool { + return true + }) } diff --git a/pulsar/default_router_test.go b/pulsar/default_router_test.go index 3c42e66d88..90d5ef37d1 100644 --- a/pulsar/default_router_test.go +++ b/pulsar/default_router_test.go @@ -28,7 +28,9 @@ import ( const oneHourPublishMaxDelay = time.Hour func TestDefaultRouterRoutingBecauseBatchingDisabled(t *testing.T) { - router := NewDefaultRouter(internal.JavaStringHash, 20, 100, oneHourPublishMaxDelay, true) + router := NewDefaultRouter(internal.JavaStringHash, 20, 100, oneHourPublishMaxDelay, func() bool { + return false + }) const numPartitions = uint32(3) p1 := router(&ProducerMessage{ Payload: []byte("message 1"), @@ -47,7 +49,10 @@ func TestDefaultRouterRoutingBecauseBatchingDisabled(t *testing.T) { func TestDefaultRouterRoutingBecauseMaxPublishDelayReached(t *testing.T) { maxPublishDelay := time.Nanosecond * 10 - router := NewDefaultRouter(internal.JavaStringHash, 10, 100, maxPublishDelay, false) + router := NewDefaultRouter(internal.JavaStringHash, 10, 100, + maxPublishDelay, func() bool { + return true + }) const numPartitions = uint32(3) p1 := router(&ProducerMessage{ Payload: []byte("message 1"), @@ -67,7 +72,9 @@ func TestDefaultRouterRoutingBecauseMaxPublishDelayReached(t *testing.T) { } func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) { - router := NewDefaultRouter(internal.JavaStringHash, 2, 100, oneHourPublishMaxDelay, false) + router := NewDefaultRouter(internal.JavaStringHash, 2, 100, oneHourPublishMaxDelay, func() bool { + return true + }) const numPartitions = uint32(3) p1 := router(&ProducerMessage{ Payload: []byte("message 1"), @@ -90,7 +97,11 @@ func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) { } func TestDefaultRouterRoutingBecauseMaxVolumeReached(t *testing.T) { - router := NewDefaultRouter(internal.JavaStringHash, 10, 10, oneHourPublishMaxDelay, false) + router := NewDefaultRouter(internal.JavaStringHash, + 10, 10, + oneHourPublishMaxDelay, func() bool { + return true + }) const numPartitions = uint32(3) p1 := router(&ProducerMessage{ Payload: []byte("message 1"), @@ -108,7 +119,10 @@ func TestDefaultRouterRoutingBecauseMaxVolumeReached(t *testing.T) { } func TestDefaultRouterNoRoutingBecausePartitionKeyIsSpecified(t *testing.T) { - router := NewDefaultRouter(internal.JavaStringHash, 1, 1, 0, false) + router := NewDefaultRouter(internal.JavaStringHash, 1, 1, + 0, func() bool { + return true + }) p1 := router(&ProducerMessage{ Key: "my-key", Payload: []byte("message 1"), @@ -124,7 +138,10 @@ func TestDefaultRouterNoRoutingBecausePartitionKeyIsSpecified(t *testing.T) { func TestDefaultRouterNoRoutingBecauseOnlyOnePartition(t *testing.T) { - router := NewDefaultRouter(internal.JavaStringHash, 1, 10, oneHourPublishMaxDelay, false) + router := NewDefaultRouter(internal.JavaStringHash, 1, 10, + oneHourPublishMaxDelay, func() bool { + return true + }) // partition index should not change regardless of the batching settings p1 := router(&ProducerMessage{ diff --git a/pulsar/producer.go b/pulsar/producer.go index d088fb2d60..a90d40eec3 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -132,6 +132,13 @@ type ProducerOptions struct { // Setting `DisableBatching: true` will make the producer to send messages individually DisableBatching bool + // This config will ensure that If possible PartitionedProducer would attempt to produce message on + // another available partitions, If currently picked partition is not available for some reason. + // Next available partition will be chosen by the same routing policy as client is configured with. + // MaxRetryOtherPartitions How many partitions should be tried before bailing out and failing back to the + // old behaviour. + MaxRetryOtherPartitions int + // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) // if batch messages are enabled. If set to a non zero value, messages will be queued until this time // interval or until diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 3c45b597d0..27cb978561 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -116,7 +116,16 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { options.BatchingMaxMessages, options.BatchingMaxSize, options.BatchingMaxPublishDelay, - options.DisableBatching) + func() bool { + if options.DisableBatching { + return false + } + + if options.MaxRetryOtherPartitions > 0 && !p.isProducerConnected() { + return false + } + return true + }) p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int { return internalRouter(message, metadata.NumPartitions()) } @@ -160,6 +169,15 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { return p, nil } +func (p *producer) isProducerConnected() bool { + for _, partitionedP := range p.producers { + if partitionedP.(*partitionProducer).getProducerState() != producerReady { + return false + } + } + return true +} + func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) { var wg sync.WaitGroup stopDiscoveryCh := make(chan struct{}) @@ -306,6 +324,23 @@ func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage, p.getPartition(msg).SendAsync(ctx, msg, callback) } +func getNextConnectedPartition(p *producer, msg *ProducerMessage, startPartition int, maxRetry int) int { + if maxRetry == 0 { + return startPartition + } + partition := p.messageRouter(msg, p) + if partition == startPartition { + return partition + } + producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr)) + producerForPartition := producers[partition].(*partitionProducer) + if producerForPartition.getProducerState() == producerReady { + return partition + } + maxRetry-- + return getNextConnectedPartition(p, msg, partition, maxRetry) +} + func (p *producer) getPartition(msg *ProducerMessage) Producer { // Since partitions can only increase, it's ok if the producers list // is updated in between. The numPartition is updated only after the list. @@ -316,7 +351,15 @@ func (p *producer) getPartition(msg *ProducerMessage) Producer { // updated partition %= len(producers) } - return producers[partition] + producerForPartition := producers[partition].(*partitionProducer) + if producerForPartition.getProducerState() != producerReady { + nextPartition := getNextConnectedPartition(p, msg, partition, 5) + if nextPartition != partition { + return producers[nextPartition].(*partitionProducer) + } + } + + return producerForPartition } func (p *producer) LastSequenceID() int64 { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f193ffd4ec..71ea2cee86 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -443,6 +443,68 @@ func TestFlushInPartitionedProducer(t *testing.T) { assert.Equal(t, msgCount, numOfMessages/2) } +func TestRoundRobinRouterPartitionedProducerProduceOnNextTopicOnFailure(t *testing.T) { + topicName := "public/default/partition-ProduceOnNextTopicOnFailure" + strconv.FormatInt(time.Now().Unix(), 10) + numberOfPartitions := 10 + + // call admin api to make it partitioned + url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions" + makeHTTPCall(t, http.MethodPut, url, strconv.Itoa(numberOfPartitions)) + + numOfMessages := 100 + ctx := context.Background() + + // creat client connection + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "my-sub", + Type: Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create pro + pro, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: false, + BatchingMaxMessages: uint(5), + MaxRetryOtherPartitions: 5, + }) + p := pro.(*producer).producers[0] + firstProducer := p.(*partitionProducer) + firstProducer.Close() + assert.NotEqual(t, firstProducer.getProducerState(), producerReady) + defer pro.Close() + + prefix := "msg-" + for i := 0; i < numOfMessages; i++ { + messageContent := prefix + fmt.Sprintf("%d", i) + _, err = pro.Send(ctx, &ProducerMessage{ + Payload: []byte(messageContent), + }) + assert.Nil(t, err) + } + mChan := make(chan Message, numOfMessages) + for i := 0; i < numOfMessages; i++ { + go func() { + msg, _ := consumer.Receive(ctx) + assert.NotEqual(t, msg.Topic(), "persistent://"+topicName+"-partition-0") + mChan <- msg + }() + } + + time.Sleep(1 * time.Second) + l := len(mChan) + assert.Equal(t, numOfMessages, l, "Number of messages received") +} + func TestRoundRobinRouterPartitionedProducer(t *testing.T) { topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer" numberOfPartitions := 5