From 1a5179091afe38d96f0bb13eb009002800087fad Mon Sep 17 00:00:00 2001 From: labuladong Date: Tue, 11 Oct 2022 12:31:42 +0800 Subject: [PATCH] make `topics` as method of `client` --- pulsar/client_impl.go | 11 +++++++++++ pulsar/consumer_regex.go | 14 ++------------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index e7fa642aa5..1a4d7625e4 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -20,6 +20,7 @@ package pulsar import ( "fmt" "net/url" + "regexp" "time" "github.com/prometheus/client_golang/prometheus" @@ -227,3 +228,13 @@ func (c *client) Close() { c.cnxPool.Close() c.lookupService.Close() } + +func (c *client) topics(namespace string, pattern *regexp.Regexp) ([]string, error) { + topics, err := c.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent) + if err != nil { + return nil, err + } + + filtered := filterTopics(topics, pattern) + return filtered, nil +} diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 237f8d86fd..f4cf407955 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -48,7 +48,7 @@ type regexConsumer struct { func newRegexConsumer(client *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (*regexConsumer, error) { - topics, err := topics(client, tn.Namespace, pattern) + topics, err := client.topics(tn.Namespace, pattern) if err != nil { return nil, err } @@ -160,7 +160,7 @@ func (c *regexConsumer) monitor() { } func (c *regexConsumer) discover() { - topics, err := topics(c.client, c.namespace, c.pattern) + topics, err := c.client.topics(c.namespace, c.pattern) if err != nil { c.log.WithError(err).Errorf("Failed to discover topics") return @@ -235,16 +235,6 @@ func (c *regexConsumer) unsubscribe(topics []string) { } } -func topics(client *client, namespace string, pattern *regexp.Regexp) ([]string, error) { - topics, err := client.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent) - if err != nil { - return nil, err - } - - filtered := filterTopics(topics, pattern) - return filtered, nil -} - type consumerError struct { err error topic string