Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 1075][pulsaradmin] Reorganize pulsaradmin codebase #1085

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 0 additions & 34 deletions pulsaradmin/alias.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package utils
package pulsaradmin

type AllocatorStats struct {
NumDirectArenas int `json:"numDirectArenas"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,58 +15,56 @@
// specific language governing permissions and limitations
// under the License.

package admin

import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
package pulsaradmin

// BrokerStats is admin interface for broker stats management
type BrokerStats interface {
// GetMetrics returns Monitoring metrics
GetMetrics() ([]utils.Metrics, error)
GetMetrics() ([]Metrics, error)

// GetMBeans requests JSON string server mbean dump
GetMBeans() ([]utils.Metrics, error)
GetMBeans() ([]Metrics, error)

// GetTopics returns JSON string topics stats
GetTopics() (string, error)

// GetLoadReport returns load report of broker
GetLoadReport() (*utils.LocalBrokerData, error)
GetLoadReport() (*LocalBrokerData, error)

// GetAllocatorStats returns stats from broker
GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error)
GetAllocatorStats(allocatorName string) (*AllocatorStats, error)
}

type brokerStats struct {
pulsar *pulsarClient
basePath string
pulsar *pulsarClient
basePath string
apiVersion APIVersion
}

// BrokerStats is used to access the broker stats endpoints
func (c *pulsarClient) BrokerStats() BrokerStats {
return &brokerStats{
pulsar: c,
basePath: "/broker-stats",
pulsar: c,
basePath: "/broker-stats",
apiVersion: c.apiProfile.BrokerStats,
}
}

func (bs *brokerStats) GetMetrics() ([]utils.Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/metrics")
var response []utils.Metrics
err := bs.pulsar.Client.Get(endpoint, &response)
func (bs *brokerStats) GetMetrics() ([]Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/metrics")
var response []Metrics
err := bs.pulsar.restClient.Get(endpoint, &response)
if err != nil {
return nil, err
}

return response, nil
}

func (bs *brokerStats) GetMBeans() ([]utils.Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/mbeans")
var response []utils.Metrics
err := bs.pulsar.Client.Get(endpoint, &response)
func (bs *brokerStats) GetMBeans() ([]Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/mbeans")
var response []Metrics
err := bs.pulsar.restClient.Get(endpoint, &response)
if err != nil {
return nil, err
}
Expand All @@ -75,29 +73,29 @@ func (bs *brokerStats) GetMBeans() ([]utils.Metrics, error) {
}

func (bs *brokerStats) GetTopics() (string, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/topics")
buf, err := bs.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/topics")
buf, err := bs.pulsar.restClient.GetWithQueryParams(endpoint, nil, nil, false)
if err != nil {
return "", err
}

return string(buf), nil
}

func (bs *brokerStats) GetLoadReport() (*utils.LocalBrokerData, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/load-report")
response := utils.NewLocalBrokerData()
err := bs.pulsar.Client.Get(endpoint, &response)
func (bs *brokerStats) GetLoadReport() (*LocalBrokerData, error) {
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/load-report")
response := NewLocalBrokerData()
err := bs.pulsar.restClient.Get(endpoint, &response)
if err != nil {
return nil, nil
}
return &response, nil
}

func (bs *brokerStats) GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/allocator-stats", allocatorName)
var allocatorStats utils.AllocatorStats
err := bs.pulsar.Client.Get(endpoint, &allocatorStats)
func (bs *brokerStats) GetAllocatorStats(allocatorName string) (*AllocatorStats, error) {
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/allocator-stats", allocatorName)
var allocatorStats AllocatorStats
err := bs.pulsar.restClient.Get(endpoint, &allocatorStats)
if err != nil {
return nil, err
}
Expand Down
62 changes: 31 additions & 31 deletions pulsaradmin/pkg/admin/brokers.go → pulsaradmin/api_brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
// specific language governing permissions and limitations
// under the License.

package admin
package pulsaradmin

import (
"fmt"
"net/url"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)

// Brokers is admin interface for brokers management
Expand All @@ -34,7 +32,7 @@ type Brokers interface {
GetDynamicConfigurationNames() ([]string, error)

// GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster
GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils.NamespaceOwnershipStatus, error)
GetOwnedNamespaces(cluster, brokerURL string) (map[string]NamespaceOwnershipStatus, error)

// UpdateDynamicConfiguration updates dynamic configuration value in to Zk that triggers watch on
// brokers and all brokers can update {@link ServiceConfiguration} value locally
Expand All @@ -48,7 +46,7 @@ type Brokers interface {
GetRuntimeConfigurations() (map[string]string, error)

// GetInternalConfigurationData returns the internal configuration data
GetInternalConfigurationData() (*utils.InternalConfigurationData, error)
GetInternalConfigurationData() (*InternalConfigurationData, error)

// GetAllDynamicConfigurations returns values of all overridden dynamic-configs
GetAllDynamicConfigurations() (map[string]string, error)
Expand All @@ -58,42 +56,44 @@ type Brokers interface {
}

type broker struct {
pulsar *pulsarClient
basePath string
pulsar *pulsarClient
basePath string
apiVersion APIVersion
}

// Brokers is used to access the brokers endpoints
func (c *pulsarClient) Brokers() Brokers {
return &broker{
pulsar: c,
basePath: "/brokers",
pulsar: c,
basePath: "/brokers",
apiVersion: c.apiProfile.Brokers,
}
}

func (b *broker) GetActiveBrokers(cluster string) ([]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, cluster)
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, cluster)
var res []string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetDynamicConfigurationNames() ([]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/")
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/")
var res []string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils.NamespaceOwnershipStatus, error) {
endpoint := b.pulsar.endpoint(b.basePath, cluster, brokerURL, "ownedNamespaces")
var res map[string]utils.NamespaceOwnershipStatus
err := b.pulsar.Client.Get(endpoint, &res)
func (b *broker) GetOwnedNamespaces(cluster, brokerURL string) (map[string]NamespaceOwnershipStatus, error) {
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, cluster, brokerURL, "ownedNamespaces")
var res map[string]NamespaceOwnershipStatus
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
Expand All @@ -102,49 +102,49 @@ func (b *broker) GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils

func (b *broker) UpdateDynamicConfiguration(configName, configValue string) error {
value := url.QueryEscape(configValue)
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", configName, value)
return b.pulsar.Client.Post(endpoint, nil)
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/", configName, value)
return b.pulsar.restClient.Post(endpoint, nil)
}

func (b *broker) DeleteDynamicConfiguration(configName string) error {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", configName)
return b.pulsar.Client.Delete(endpoint)
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/", configName)
return b.pulsar.restClient.Delete(endpoint)
}

func (b *broker) GetRuntimeConfigurations() (map[string]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", "runtime")
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/", "runtime")
var res map[string]string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetInternalConfigurationData() (*utils.InternalConfigurationData, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/internal-configuration")
var res utils.InternalConfigurationData
err := b.pulsar.Client.Get(endpoint, &res)
func (b *broker) GetInternalConfigurationData() (*InternalConfigurationData, error) {
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/internal-configuration")
var res InternalConfigurationData
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return &res, nil
}

func (b *broker) GetAllDynamicConfigurations() (map[string]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", "values")
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/", "values")
var res map[string]string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) HealthCheck() error {
endpoint := b.pulsar.endpoint(b.basePath, "/health")
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/health")

buf, err := b.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
buf, err := b.pulsar.restClient.GetWithQueryParams(endpoint, nil, nil, false)
if err != nil {
return err
}
Expand Down
Loading