From aac6b5742167b90e20f06efeffa841e036ebae0e Mon Sep 17 00:00:00 2001 From: ming luo Date: Fri, 17 Nov 2023 14:56:59 -0500 Subject: [PATCH 1/4] retry producer creation upon error --- pulsar/producer_partition.go | 44 +++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b79053e38..e5cb745658 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -197,7 +197,49 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } else { p.userProvidedProducerName = false } - err := p.grabCnx() + // retry to create producer when failed with maxRetry + var maxRetry int + if p.options.MaxReconnectToBroker == nil { + maxRetry = -1 + } else { + maxRetry = int(*p.options.MaxReconnectToBroker) + } + + var delayReconnectTime time.Duration + defaultBackoff := internal.DefaultBackoff{} + + var err error + for maxRetry != 0 { + if p.options.BackoffPolicy == nil { + delayReconnectTime = defaultBackoff.Next() + } else { + delayReconnectTime = p.options.BackoffPolicy.Next() + } + + atomic.AddUint64(&p.epoch, 1) + err = p.grabCnx() + if err == nil { + break + } + p.log.WithError(err).Error("Failed to create producer at newPartitionProducer") + errMsg := err.Error() + if strings.Contains(errMsg, errTopicNotFount) { + // when topic is not found, do not attempt to reconnect + p.log.Warn("Failed to create producer due to Topic Not Found") + break + } + + if strings.Contains(errMsg, "TopicTerminatedError") { + p.log.Info("Topic was terminated, failing pending messages, will not create producer") + break + } + + if maxRetry > 0 { + maxRetry-- + } + logger.WithError(err).Error("Failed to create producer at newPartitionProducer retry to create producer", delayReconnectTime) + time.Sleep(delayReconnectTime) + } if err != nil { p.batchFlushTicker.Stop() logger.WithError(err).Error("Failed to create producer at newPartitionProducer") From a84c97d67e116f9c4bc51cca87fda46189aa43c0 Mon Sep 17 00:00:00 2001 From: ming luo Date: Fri, 12 Jan 2024 13:59:22 -0500 Subject: [PATCH 2/4] use the new error evaluation pe review comments --- pulsar/producer_partition.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e5cb745658..f0ed6a2ff6 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -223,13 +223,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } p.log.WithError(err).Error("Failed to create producer at newPartitionProducer") errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) { + if errors.Is(err, ErrTopicNotfound) // when topic is not found, do not attempt to reconnect p.log.Warn("Failed to create producer due to Topic Not Found") break } - if strings.Contains(errMsg, "TopicTerminatedError") { + if errors.Is(err, ErrTopicTerminated) p.log.Info("Topic was terminated, failing pending messages, will not create producer") break } From d945b469618fb048f76aaf3ce4a85d81275dbb57 Mon Sep 17 00:00:00 2001 From: ming luo Date: Tue, 16 Jan 2024 15:24:48 -0500 Subject: [PATCH 3/4] fix error --- pulsar/producer_partition.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f0ed6a2ff6..c019a8d131 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -222,14 +222,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions break } p.log.WithError(err).Error("Failed to create producer at newPartitionProducer") - errMsg := err.Error() - if errors.Is(err, ErrTopicNotfound) + if errors.Is(err, ErrTopicNotfound) { // when topic is not found, do not attempt to reconnect p.log.Warn("Failed to create producer due to Topic Not Found") break } - if errors.Is(err, ErrTopicTerminated) + if errors.Is(err, ErrTopicTerminated) { p.log.Info("Topic was terminated, failing pending messages, will not create producer") break } From f58809891a2c88b9be988353f9fac657a07ed1b6 Mon Sep 17 00:00:00 2001 From: ming luo Date: Tue, 16 Jan 2024 16:34:16 -0500 Subject: [PATCH 4/4] fix lint --- pulsar/producer_partition.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index c019a8d131..f139ff17a1 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -236,7 +236,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions if maxRetry > 0 { maxRetry-- } - logger.WithError(err).Error("Failed to create producer at newPartitionProducer retry to create producer", delayReconnectTime) + logger.WithError(err). + Error("Failed to create producer at newPartitionProducer retry to create producer") time.Sleep(delayReconnectTime) } if err != nil {