Skip to content

Commit

Permalink
rewrite regexConsumer and fix unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong committed Sep 16, 2022
1 parent edd5c71 commit c1c811f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 155 deletions.
4 changes: 2 additions & 2 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type multiTopicConsumer struct {
}

func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (*multiTopicConsumer, error) {
mtc := &multiTopicConsumer{
client: client,
options: options,
Expand Down Expand Up @@ -128,7 +128,7 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return errors.New("invalid message id type in multi_consumer")
return errors.New("invalid message id type in consumer")
}

if mid.consumer == nil {
Expand Down
162 changes: 19 additions & 143 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package pulsar

import (
"context"
"errors"
"fmt"
"regexp"
"strings"
Expand All @@ -37,73 +35,36 @@ const (
)

type regexConsumer struct {
client *client
dlq *dlqRouter
rlq *retryRouter

options ConsumerOptions

messageCh chan ConsumerMessage
*multiTopicConsumer

namespace string
pattern *regexp.Regexp

consumersLock sync.Mutex
consumers map[string]Consumer
subscribeCh chan []string
unsubscribeCh chan []string

closeOnce sync.Once
closeCh chan struct{}

ticker *time.Ticker

log log.Logger

consumerName string
ticker *time.Ticker
}

func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
func newRegexConsumer(client *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (*regexConsumer, error) {
topics, err := topics(client, tn.Namespace, pattern)
if err != nil {
return nil, err
}
mtc, err := newMultiTopicConsumer(client, opts, topics, msgCh, dlq, rlq)
if err != nil {
return nil, err
}

rc := &regexConsumer{
client: c,
dlq: dlq,
rlq: rlq,
options: opts,
messageCh: msgCh,
multiTopicConsumer: mtc,

namespace: tn.Namespace,
pattern: pattern,

consumers: make(map[string]Consumer),
subscribeCh: make(chan []string, 1),
unsubscribeCh: make(chan []string, 1),

closeCh: make(chan struct{}),

log: c.log.SubLogger(log.Fields{"topic": tn.Name}),
consumerName: opts.Name,
}

topics, err := rc.topics()
if err != nil {
return nil, err
}

var errs error
for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
rc.consumers[ce.topic] = ce.consumer
}
}

if errs != nil {
for _, c := range rc.consumers {
c.Close()
}
return nil, errs
}

// set up timer
Expand All @@ -118,10 +79,6 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
return rc, nil
}

func (c *regexConsumer) Subscription() string {
return c.options.SubscriptionName
}

func (c *regexConsumer) Unsubscribe() error {
var errs error
c.consumersLock.Lock()
Expand All @@ -137,87 +94,10 @@ func (c *regexConsumer) Unsubscribe() error {
return errs
}

func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
case <-c.closeCh:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

// Chan return the messages chan to user
func (c *regexConsumer) Chan() <-chan ConsumerMessage {
return c.messageCh
}

// Ack the consumption of a single message
func (c *regexConsumer) Ack(msg Message) error {
return c.AckID(msg.ID())
}

func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
}

// AckID the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return errors.New("invalid message id type")
}

if mid.consumer == nil {
c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID)
return errors.New("consumer is nil in consumer_regex")
}

return mid.Ack()
}

func (c *regexConsumer) Nack(msg Message) {
if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {
msgID := msg.ID()
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
}

if mid.consumer == nil {
c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID)
return
}
mid.NackByMsg(msg)
return
}

c.NackID(msg.ID())
}

func (c *regexConsumer) NackID(msgID MessageID) {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
}

if mid.consumer == nil {
c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID)
return
}

mid.Nack()
}

func (c *regexConsumer) Close() {
c.closeOnce.Do(func() {
c.ticker.Stop()
Expand Down Expand Up @@ -248,11 +128,6 @@ func (c *regexConsumer) SeekByTime(time time.Time) error {
return newError(SeekFailed, "seek command not allowed for regex consumer")
}

// Name returns the name of consumer.
func (c *regexConsumer) Name() string {
return c.consumerName
}

func (c *regexConsumer) closed() bool {
select {
case <-c.closeCh:
Expand Down Expand Up @@ -285,7 +160,7 @@ func (c *regexConsumer) monitor() {
}

func (c *regexConsumer) discover() {
topics, err := c.topics()
topics, err := topics(c.client, c.namespace, c.pattern)
if err != nil {
c.log.WithError(err).Errorf("Failed to discover topics")
return
Expand Down Expand Up @@ -360,13 +235,13 @@ func (c *regexConsumer) unsubscribe(topics []string) {
}
}

func (c *regexConsumer) topics() ([]string, error) {
topics, err := c.client.lookupService.GetTopicsOfNamespace(c.namespace, internal.Persistent)
func topics(client *client, namespace string, pattern *regexp.Regexp) ([]string, error) {
topics, err := client.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent)
if err != nil {
return nil, err
}

filtered := filterTopics(topics, c.pattern)
filtered := filterTopics(topics, pattern)
return filtered, nil
}

Expand All @@ -376,6 +251,7 @@ type consumerError struct {
consumer Consumer
}

// create consumers for all partitions of topics
func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage,
dlq *dlqRouter, rlq *retryRouter) <-chan consumerError {
consumerErrorCh := make(chan consumerError, len(topics))
Expand Down
15 changes: 6 additions & 9 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,11 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string

dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
rc, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
}
defer consumer.Close()

rc := consumer.(*regexConsumer)
defer rc.Close()

// trigger discovery
rc.discover()
Expand All @@ -180,7 +178,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
retryAssert(t, 5, 2000, func() {
consumers = cloneConsumers(rc)
}, func(x assert.TestingT) bool {
return assert.Equal(x, 1, len(consumers))
_, ok := consumers["persistent://"+topic]
return assert.Equal(x, true, ok)
})
}

Expand All @@ -194,13 +193,11 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string

dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
rc, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
}
defer consumer.Close()

rc := consumer.(*regexConsumer)
defer rc.Close()

// trigger discovery
rc.discover()
Expand Down
2 changes: 1 addition & 1 deletion pulsar/helper_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func retryAssert(t assert.TestingT, times int, milliseconds int, update func(),
for i := 0; i < times; i++ {
time.Sleep(time.Duration(milliseconds) * time.Millisecond)
update()
if assert(nil) {
if assert(t) {
break
}
}
Expand Down

0 comments on commit c1c811f

Please sign in to comment.