Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topicctl get action to fetch controllerid and clusterid #176

Merged
merged 5 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func init() {
getCmd.AddCommand(
balanceCmd(),
brokersCmd(),
controllerCmd(),
clusterIDCmd(),
configCmd(),
groupsCmd(),
lagsCmd(),
Expand Down Expand Up @@ -134,6 +136,48 @@ func brokersCmd() *cobra.Command {
}
}

func controllerCmd() *cobra.Command {
return &cobra.Command{
Use: "controllerid",
Short: "Displays active controller broker id.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
return err
}
defer adminClient.Close()

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetControllerID(ctx, getConfig.full)
},
}
}

func clusterIDCmd() *cobra.Command {
return &cobra.Command{
Use: "clusterid",
Short: "Displays cluster id.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
return err
}
defer adminClient.Close()

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetClusterID(ctx, getConfig.full)
},
}
}

func configCmd() *cobra.Command {
return &cobra.Command{
Use: "config [broker or topic]",
Expand Down
13 changes: 13 additions & 0 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,19 @@ func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) (
return brokerInfos, nil
}

// GetControllerID gets ID of the active controller broker
func (c *BrokerAdminClient) GetControllerID(ctx context.Context) (
int,
error,
) {
metadataResp, err := c.getMetadata(ctx, nil)
if err != nil {
return -1, err
}

return metadataResp.Controller.ID, nil
}

// GetBrokerIDs get the IDs of all brokers in the cluster.
func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error) {
resp, err := c.getMetadata(ctx, nil)
Expand Down
31 changes: 31 additions & 0 deletions pkg/admin/brokerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,37 @@ import (
"github.com/stretchr/testify/require"
)

func TestBrokerClientControllerID(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
}

ctx := context.Background()
client, err := NewBrokerAdminClient(
ctx,
BrokerAdminClientConfig{
ConnectorConfig: ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
},
},
)
require.NoError(t, err)

brokerIDs, err := client.GetBrokerIDs(ctx)
require.NoError(t, err)
assert.Equal(
t,
[]int{1, 2, 3, 4, 5, 6},
brokerIDs,
)

controllerID, err := client.GetControllerID(ctx)
require.NoError(t, err)
assert.Condition(t, func() bool {
return controllerID >= 1 && controllerID <= 6
ssingudasu marked this conversation as resolved.
Show resolved Hide resolved
}, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID))
}

func TestBrokerClientGetClusterID(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type Client interface {
// GetBrokers gets information about all brokers in the cluster.
GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)

// GetControllerID get the active controller broker ID in the cluster.
GetControllerID(ctx context.Context) (int, error)

// GetBrokerIDs get the IDs of all brokers in the cluster.
GetBrokerIDs(ctx context.Context) ([]int, error)

Expand Down
58 changes: 58 additions & 0 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,64 @@ func FormatBrokers(brokers []BrokerInfo, full bool) string {
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatControllerID creates a pretty table for controller broker.
func FormatControllerID(brokerID int) string {
buf := &bytes.Buffer{}
table := tablewriter.NewWriter(buf)
headers := []string{"Active Controller"}
table.SetHeader(headers)

table.SetColumnAlignment(
[]int{
tablewriter.ALIGN_LEFT,
},
)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

table.Append([]string{
fmt.Sprintf("%d", brokerID),
})

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatClusterID creates a pretty table for cluster ID.
func FormatClusterID(clusterID string) string {
buf := &bytes.Buffer{}
table := tablewriter.NewWriter(buf)
headers := []string{"Kafka Cluster ID"}
table.SetHeader(headers)

table.SetColumnAlignment(
[]int{
tablewriter.ALIGN_LEFT,
},
)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

table.Append([]string{
clusterID,
})

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatBrokerReplicas creates a pretty table that shows how many replicas are in each
// position (i.e., leader, second, third) by broker across all topics. Useful for showing
// total-topic balance.
Expand Down
6 changes: 6 additions & 0 deletions pkg/admin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ type zkClusterID struct {
ID string `json:"id"`
}

type zkControllerInfo struct {
Version int `json:"version"`
BrokerID int `json:"brokerid"`
Timestamp string `json:"timestamp"`
}

type zkBrokerInfo struct {
Endpoints []string `json:"endpoints"`
Host string `json:"host"`
Expand Down
23 changes: 23 additions & 0 deletions pkg/admin/zkclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
assignmentPath = "/admin/reassign_partitions"
electionPath = "/admin/preferred_replica_election"
brokersPath = "/brokers/ids"
controllerPath = "/controller"
topicsPath = "/brokers/topics"
clusterIDPath = "/cluster/id"
brokerConfigsPath = "/config/brokers"
Expand Down Expand Up @@ -293,6 +294,28 @@ func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error) {
return brokerIDs, nil
}

// GetControllerID gets ID of the active controller broker
func (c *ZKAdminClient) GetControllerID(
ctx context.Context,
) (int, error) {
zkControllerInfo := zkControllerInfo{}
zkControllerPath := c.zNode(controllerPath)

_, err := c.zkClient.GetJSON(
ctx,
zkControllerPath,
&zkControllerInfo,
)
if err != nil {
return -1, fmt.Errorf("Error getting zookeeper path %s: %+v",
zkControllerPath,
err,
)
}

return zkControllerInfo.BrokerID, nil
}

// GetConnector returns the Connector instance associated with this client.
func (c *ZKAdminClient) GetConnector() *Connector {
return c.Connector
Expand Down
47 changes: 47 additions & 0 deletions pkg/admin/zkclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,53 @@ import (
"github.com/stretchr/testify/require"
)

func TestZkClientControllerID(t *testing.T) {
zkConn, _, err := szk.Connect(
[]string{util.TestZKAddr()},
5*time.Second,
)
require.NoError(t, err)
require.NotNil(t, zkConn)
defer zkConn.Close()

clusterName := testClusterID("clusterID")
zk.CreateNodes(
t,
zkConn,
[]zk.PathTuple{
{
Path: fmt.Sprintf("/%s", clusterName),
Obj: nil,
},
{
Path: fmt.Sprintf("/%s/controller", clusterName),
Obj: map[string]interface{}{
"version": 1,
"brokerid": 3,
"timestamp": "1589603217000",
},
},
},
)

ctx := context.Background()
adminClient, err := NewZKAdminClient(
ctx,
ZKAdminClientConfig{
ZKAddrs: []string{util.TestZKAddr()},
ZKPrefix: clusterName,
BootstrapAddrs: []string{util.TestKafkaAddr()},
ReadOnly: true,
},
)
require.NoError(t, err)
defer adminClient.Close()

controllerID, err := adminClient.GetControllerID(ctx)
assert.NoError(t, err)
assert.Equal(t, 3, controllerID)
}

func TestZkClientGetClusterID(t *testing.T) {
zkConn, _, err := szk.Connect(
[]string{util.TestZKAddr()},
Expand Down
28 changes: 28 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,34 @@ func (c *CLIRunner) GetBrokers(ctx context.Context, full bool) error {
return nil
}

// Get active controller broker ID
func (c *CLIRunner) GetControllerID(ctx context.Context, full bool) error {
c.startSpinner()

brokerID, err := c.adminClient.GetControllerID(ctx)
c.stopSpinner()
if err != nil {
return err
}

c.printer("Broker ID:\n%s", admin.FormatControllerID(brokerID))
return nil
}

// Get cluster ID
func (c *CLIRunner) GetClusterID(ctx context.Context, full bool) error {
c.startSpinner()

clusterID, err := c.adminClient.GetClusterID(ctx)
c.stopSpinner()
if err != nil {
return err
}

c.printer("Cluster ID:\n%s", admin.FormatClusterID(clusterID))
return nil
}

// ApplyTopic does an apply run according to the spec in the argument config.
func (c *CLIRunner) ApplyTopic(
ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion pkg/version/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package version

// Version is the current topicctl version.
const Version = "1.13.0"
const Version = "1.14.0"
Loading