From df8a2d145c96c624cffc0424d76ba19d963807f4 Mon Sep 17 00:00:00 2001 From: labuladong Date: Mon, 19 Sep 2022 10:16:12 +0800 Subject: [PATCH 1/2] rewrite regexConsumer --- pulsar/consumer_multitopic.go | 4 +- pulsar/consumer_regex.go | 166 ++++------------------------------ pulsar/consumer_regex_test.go | 12 +-- pulsar/helper_for_test.go | 2 +- 4 files changed, 26 insertions(+), 158 deletions(-) diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 380dd75379..3e4dbdfea3 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -49,7 +49,7 @@ type multiTopicConsumer struct { } func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string, - messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (*multiTopicConsumer, error) { mtc := &multiTopicConsumer{ client: client, options: options, @@ -128,7 +128,7 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error { mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) - return errors.New("invalid message id type in multi_consumer") + return errors.New("invalid message id type in consumer") } if mid.consumer == nil { diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index c55a1c1143..237f8d86fd 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -18,8 +18,6 @@ package pulsar import ( - "context" - "errors" "fmt" "regexp" "strings" @@ -37,73 +35,36 @@ const ( ) type regexConsumer struct { - client *client - dlq *dlqRouter - rlq *retryRouter - - options ConsumerOptions - - messageCh chan ConsumerMessage + *multiTopicConsumer namespace string pattern *regexp.Regexp consumersLock sync.Mutex - consumers map[string]Consumer subscribeCh chan []string unsubscribeCh chan []string - - closeOnce sync.Once - closeCh chan struct{} - - ticker *time.Ticker - - log log.Logger - - consumerName string + ticker *time.Ticker } -func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, - msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { +func newRegexConsumer(client *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, + msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (*regexConsumer, error) { + topics, err := topics(client, tn.Namespace, pattern) + if err != nil { + return nil, err + } + mtc, err := newMultiTopicConsumer(client, opts, topics, msgCh, dlq, rlq) + if err != nil { + return nil, err + } + rc := ®exConsumer{ - client: c, - dlq: dlq, - rlq: rlq, - options: opts, - messageCh: msgCh, + multiTopicConsumer: mtc, namespace: tn.Namespace, pattern: pattern, - consumers: make(map[string]Consumer), subscribeCh: make(chan []string, 1), unsubscribeCh: make(chan []string, 1), - - closeCh: make(chan struct{}), - - log: c.log.SubLogger(log.Fields{"topic": tn.Name}), - consumerName: opts.Name, - } - - topics, err := rc.topics() - if err != nil { - return nil, err - } - - var errs error - for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) { - if ce.err != nil { - errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic) - } else { - rc.consumers[ce.topic] = ce.consumer - } - } - - if errs != nil { - for _, c := range rc.consumers { - c.Close() - } - return nil, errs } // set up timer @@ -118,10 +79,6 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p return rc, nil } -func (c *regexConsumer) Subscription() string { - return c.options.SubscriptionName -} - func (c *regexConsumer) Unsubscribe() error { var errs error c.consumersLock.Lock() @@ -137,91 +94,10 @@ func (c *regexConsumer) Unsubscribe() error { return errs } -func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error) { - for { - select { - case <-c.closeCh: - return nil, newError(ConsumerClosed, "consumer closed") - case cm, ok := <-c.messageCh: - if !ok { - return nil, newError(ConsumerClosed, "consumer closed") - } - return cm.Message, nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } -} - -// Chan return the messages chan to user -func (c *regexConsumer) Chan() <-chan ConsumerMessage { - return c.messageCh -} - -// Ack the consumption of a single message -func (c *regexConsumer) Ack(msg Message) error { - return c.AckID(msg.ID()) -} - func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) { c.log.Warnf("regexp consumer not support ReconsumeLater yet.") } -// AckID the consumption of a single message, identified by its MessageID -func (c *regexConsumer) AckID(msgID MessageID) error { - mid, ok := toTrackingMessageID(msgID) - if !ok { - c.log.Warnf("invalid message id type %T", msgID) - return errors.New("invalid message id type") - } - - if mid.consumer == nil { - c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID) - return errors.New("consumer is nil in consumer_regex") - } - - if c.options.AckWithResponse { - return mid.AckWithResponse() - } - - return mid.Ack() -} - -func (c *regexConsumer) Nack(msg Message) { - if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil { - msgID := msg.ID() - mid, ok := toTrackingMessageID(msgID) - if !ok { - c.log.Warnf("invalid message id type %T", msgID) - return - } - - if mid.consumer == nil { - c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) - return - } - mid.NackByMsg(msg) - return - } - - c.NackID(msg.ID()) -} - -func (c *regexConsumer) NackID(msgID MessageID) { - mid, ok := toTrackingMessageID(msgID) - if !ok { - c.log.Warnf("invalid message id type %T", msgID) - return - } - - if mid.consumer == nil { - c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) - return - } - - mid.Nack() -} - func (c *regexConsumer) Close() { c.closeOnce.Do(func() { c.ticker.Stop() @@ -252,11 +128,6 @@ func (c *regexConsumer) SeekByTime(time time.Time) error { return newError(SeekFailed, "seek command not allowed for regex consumer") } -// Name returns the name of consumer. -func (c *regexConsumer) Name() string { - return c.consumerName -} - func (c *regexConsumer) closed() bool { select { case <-c.closeCh: @@ -289,7 +160,7 @@ func (c *regexConsumer) monitor() { } func (c *regexConsumer) discover() { - topics, err := c.topics() + topics, err := topics(c.client, c.namespace, c.pattern) if err != nil { c.log.WithError(err).Errorf("Failed to discover topics") return @@ -364,13 +235,13 @@ func (c *regexConsumer) unsubscribe(topics []string) { } } -func (c *regexConsumer) topics() ([]string, error) { - topics, err := c.client.lookupService.GetTopicsOfNamespace(c.namespace, internal.Persistent) +func topics(client *client, namespace string, pattern *regexp.Regexp) ([]string, error) { + topics, err := client.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent) if err != nil { return nil, err } - filtered := filterTopics(topics, c.pattern) + filtered := filterTopics(topics, pattern) return filtered, nil } @@ -380,6 +251,7 @@ type consumerError struct { consumer Consumer } +// create consumers for all partitions of topics func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) <-chan consumerError { consumerErrorCh := make(chan consumerError, len(topics)) diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 9cb600fe3a..8fa4cbe723 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -156,13 +156,11 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) + rc, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { t.Fatal(err) } - defer consumer.Close() - - rc := consumer.(*regexConsumer) + defer rc.Close() // trigger discovery rc.discover() @@ -194,13 +192,11 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) + rc, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { t.Fatal(err) } - defer consumer.Close() - - rc := consumer.(*regexConsumer) + defer rc.Close() // trigger discovery rc.discover() diff --git a/pulsar/helper_for_test.go b/pulsar/helper_for_test.go index d6a4f0042d..00f41625d3 100644 --- a/pulsar/helper_for_test.go +++ b/pulsar/helper_for_test.go @@ -171,7 +171,7 @@ func retryAssert(t assert.TestingT, times int, milliseconds int, update func(), for i := 0; i < times; i++ { time.Sleep(time.Duration(milliseconds) * time.Millisecond) update() - if assert(nil) { + if assert(t) { break } } From 4e9e33c9fdc43a07ccc4226d4e1661adfabe6c52 Mon Sep 17 00:00:00 2001 From: labuladong Date: Tue, 11 Oct 2022 12:31:42 +0800 Subject: [PATCH 2/2] make `topics` as method of `client` --- pulsar/client_impl.go | 11 +++++++++++ pulsar/consumer_regex.go | 14 ++------------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 712e197205..4375843536 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -20,6 +20,7 @@ package pulsar import ( "fmt" "net/url" + "regexp" "time" "github.com/prometheus/client_golang/prometheus" @@ -229,3 +230,13 @@ func (c *client) Close() { c.cnxPool.Close() c.lookupService.Close() } + +func (c *client) topics(namespace string, pattern *regexp.Regexp) ([]string, error) { + topics, err := c.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent) + if err != nil { + return nil, err + } + + filtered := filterTopics(topics, pattern) + return filtered, nil +} diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 237f8d86fd..f4cf407955 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -48,7 +48,7 @@ type regexConsumer struct { func newRegexConsumer(client *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (*regexConsumer, error) { - topics, err := topics(client, tn.Namespace, pattern) + topics, err := client.topics(tn.Namespace, pattern) if err != nil { return nil, err } @@ -160,7 +160,7 @@ func (c *regexConsumer) monitor() { } func (c *regexConsumer) discover() { - topics, err := topics(c.client, c.namespace, c.pattern) + topics, err := c.client.topics(c.namespace, c.pattern) if err != nil { c.log.WithError(err).Errorf("Failed to discover topics") return @@ -235,16 +235,6 @@ func (c *regexConsumer) unsubscribe(topics []string) { } } -func topics(client *client, namespace string, pattern *regexp.Regexp) ([]string, error) { - topics, err := client.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent) - if err != nil { - return nil, err - } - - filtered := filterTopics(topics, pattern) - return filtered, nil -} - type consumerError struct { err error topic string