diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 1d75a2477b..7b3b2a2b4e 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -49,7 +49,7 @@ type multiTopicConsumer struct { } func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string, - messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (*multiTopicConsumer, error) { mtc := &multiTopicConsumer{ client: client, options: options, @@ -128,7 +128,7 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error { mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) - return errors.New("invalid message id type in multi_consumer") + return errors.New("invalid message id type in consumer") } if mid.consumer == nil { diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index e4d2077ac7..237f8d86fd 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -18,8 +18,6 @@ package pulsar import ( - "context" - "errors" "fmt" "regexp" "strings" @@ -37,73 +35,36 @@ const ( ) type regexConsumer struct { - client *client - dlq *dlqRouter - rlq *retryRouter - - options ConsumerOptions - - messageCh chan ConsumerMessage + *multiTopicConsumer namespace string pattern *regexp.Regexp consumersLock sync.Mutex - consumers map[string]Consumer subscribeCh chan []string unsubscribeCh chan []string - - closeOnce sync.Once - closeCh chan struct{} - - ticker *time.Ticker - - log log.Logger - - consumerName string + ticker *time.Ticker } -func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, - msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { +func newRegexConsumer(client *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, + msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (*regexConsumer, error) { + topics, err := topics(client, tn.Namespace, pattern) + if err != nil { + return nil, err + } + mtc, err := newMultiTopicConsumer(client, opts, topics, msgCh, dlq, rlq) + if err != nil { + return nil, err + } + rc := ®exConsumer{ - client: c, - dlq: dlq, - rlq: rlq, - options: opts, - messageCh: msgCh, + multiTopicConsumer: mtc, namespace: tn.Namespace, pattern: pattern, - consumers: make(map[string]Consumer), subscribeCh: make(chan []string, 1), unsubscribeCh: make(chan []string, 1), - - closeCh: make(chan struct{}), - - log: c.log.SubLogger(log.Fields{"topic": tn.Name}), - consumerName: opts.Name, - } - - topics, err := rc.topics() - if err != nil { - return nil, err - } - - var errs error - for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) { - if ce.err != nil { - errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic) - } else { - rc.consumers[ce.topic] = ce.consumer - } - } - - if errs != nil { - for _, c := range rc.consumers { - c.Close() - } - return nil, errs } // set up timer @@ -118,10 +79,6 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p return rc, nil } -func (c *regexConsumer) Subscription() string { - return c.options.SubscriptionName -} - func (c *regexConsumer) Unsubscribe() error { var errs error c.consumersLock.Lock() @@ -137,87 +94,10 @@ func (c *regexConsumer) Unsubscribe() error { return errs } -func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error) { - for { - select { - case <-c.closeCh: - return nil, newError(ConsumerClosed, "consumer closed") - case cm, ok := <-c.messageCh: - if !ok { - return nil, newError(ConsumerClosed, "consumer closed") - } - return cm.Message, nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } -} - -// Chan return the messages chan to user -func (c *regexConsumer) Chan() <-chan ConsumerMessage { - return c.messageCh -} - -// Ack the consumption of a single message -func (c *regexConsumer) Ack(msg Message) error { - return c.AckID(msg.ID()) -} - func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) { c.log.Warnf("regexp consumer not support ReconsumeLater yet.") } -// AckID the consumption of a single message, identified by its MessageID -func (c *regexConsumer) AckID(msgID MessageID) error { - mid, ok := toTrackingMessageID(msgID) - if !ok { - c.log.Warnf("invalid message id type %T", msgID) - return errors.New("invalid message id type") - } - - if mid.consumer == nil { - c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID) - return errors.New("consumer is nil in consumer_regex") - } - - return mid.Ack() -} - -func (c *regexConsumer) Nack(msg Message) { - if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil { - msgID := msg.ID() - mid, ok := toTrackingMessageID(msgID) - if !ok { - c.log.Warnf("invalid message id type %T", msgID) - return - } - - if mid.consumer == nil { - c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) - return - } - mid.NackByMsg(msg) - return - } - - c.NackID(msg.ID()) -} - -func (c *regexConsumer) NackID(msgID MessageID) { - mid, ok := toTrackingMessageID(msgID) - if !ok { - c.log.Warnf("invalid message id type %T", msgID) - return - } - - if mid.consumer == nil { - c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) - return - } - - mid.Nack() -} - func (c *regexConsumer) Close() { c.closeOnce.Do(func() { c.ticker.Stop() @@ -248,11 +128,6 @@ func (c *regexConsumer) SeekByTime(time time.Time) error { return newError(SeekFailed, "seek command not allowed for regex consumer") } -// Name returns the name of consumer. -func (c *regexConsumer) Name() string { - return c.consumerName -} - func (c *regexConsumer) closed() bool { select { case <-c.closeCh: @@ -285,7 +160,7 @@ func (c *regexConsumer) monitor() { } func (c *regexConsumer) discover() { - topics, err := c.topics() + topics, err := topics(c.client, c.namespace, c.pattern) if err != nil { c.log.WithError(err).Errorf("Failed to discover topics") return @@ -360,13 +235,13 @@ func (c *regexConsumer) unsubscribe(topics []string) { } } -func (c *regexConsumer) topics() ([]string, error) { - topics, err := c.client.lookupService.GetTopicsOfNamespace(c.namespace, internal.Persistent) +func topics(client *client, namespace string, pattern *regexp.Regexp) ([]string, error) { + topics, err := client.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent) if err != nil { return nil, err } - filtered := filterTopics(topics, c.pattern) + filtered := filterTopics(topics, pattern) return filtered, nil } @@ -376,6 +251,7 @@ type consumerError struct { consumer Consumer } +// create consumers for all partitions of topics func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) <-chan consumerError { consumerErrorCh := make(chan consumerError, len(topics)) diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 9cb600fe3a..8fa4cbe723 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -156,13 +156,11 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) + rc, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { t.Fatal(err) } - defer consumer.Close() - - rc := consumer.(*regexConsumer) + defer rc.Close() // trigger discovery rc.discover() @@ -194,13 +192,11 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) + rc, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { t.Fatal(err) } - defer consumer.Close() - - rc := consumer.(*regexConsumer) + defer rc.Close() // trigger discovery rc.discover() diff --git a/pulsar/helper_for_test.go b/pulsar/helper_for_test.go index d6a4f0042d..00f41625d3 100644 --- a/pulsar/helper_for_test.go +++ b/pulsar/helper_for_test.go @@ -171,7 +171,7 @@ func retryAssert(t assert.TestingT, times int, milliseconds int, update func(), for i := 0; i < times; i++ { time.Sleep(time.Duration(milliseconds) * time.Millisecond) update() - if assert(nil) { + if assert(t) { break } }