diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index 60e458a5..a3d86252 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -60,6 +60,8 @@ func init() { getCmd.AddCommand( balanceCmd(), brokersCmd(), + controllerCmd(), + clusterIDCmd(), configCmd(), groupsCmd(), lagsCmd(), @@ -134,6 +136,48 @@ func brokersCmd() *cobra.Command { } } +func controllerCmd() *cobra.Command { + return &cobra.Command{ + Use: "controllerid", + Short: "Displays active controller broker id.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.GetControllerID(ctx, getConfig.full) + }, + } +} + +func clusterIDCmd() *cobra.Command { + return &cobra.Command{ + Use: "clusterid", + Short: "Displays cluster id.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.GetClusterID(ctx, getConfig.full) + }, + } +} + func configCmd() *cobra.Command { return &cobra.Command{ Use: "config [broker or topic]", diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 5df6a706..6cf1305a 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -232,6 +232,19 @@ func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) ( return brokerInfos, nil } +// GetControllerID gets ID of the active controller broker +func (c *BrokerAdminClient) GetControllerID(ctx context.Context) ( + int, + error, +) { + metadataResp, err := c.getMetadata(ctx, nil) + if err != nil { + return -1, err + } + + return metadataResp.Controller.ID, nil +} + // GetBrokerIDs get the IDs of all brokers in the cluster. func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error) { resp, err := c.getMetadata(ctx, nil) diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index 763110b6..8691195a 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -13,6 +13,37 @@ import ( "github.com/stretchr/testify/require" ) +func TestBrokerClientControllerID(t *testing.T) { + if !util.CanTestBrokerAdmin() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") + } + + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, + ) + require.NoError(t, err) + + brokerIDs, err := client.GetBrokerIDs(ctx) + require.NoError(t, err) + assert.Equal( + t, + []int{1, 2, 3, 4, 5, 6}, + brokerIDs, + ) + + controllerID, err := client.GetControllerID(ctx) + require.NoError(t, err) + assert.Condition(t, func() bool { + return controllerID >= 1 && controllerID <= 6 + }, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID)) +} + func TestBrokerClientGetClusterID(t *testing.T) { if !util.CanTestBrokerAdmin() { t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") diff --git a/pkg/admin/client.go b/pkg/admin/client.go index 964292e4..919a282c 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -15,6 +15,9 @@ type Client interface { // GetBrokers gets information about all brokers in the cluster. GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error) + // GetControllerID get the active controller broker ID in the cluster. + GetControllerID(ctx context.Context) (int, error) + // GetBrokerIDs get the IDs of all brokers in the cluster. GetBrokerIDs(ctx context.Context) ([]int, error) diff --git a/pkg/admin/format.go b/pkg/admin/format.go index 03921237..8d2263f9 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -108,6 +108,64 @@ func FormatBrokers(brokers []BrokerInfo, full bool) string { return string(bytes.TrimRight(buf.Bytes(), "\n")) } +// FormatControllerID creates a pretty table for controller broker. +func FormatControllerID(brokerID int) string { + buf := &bytes.Buffer{} + table := tablewriter.NewWriter(buf) + headers := []string{"Active Controller"} + table.SetHeader(headers) + + table.SetColumnAlignment( + []int{ + tablewriter.ALIGN_LEFT, + }, + ) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + table.Append([]string{ + fmt.Sprintf("%d", brokerID), + }) + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + +// FormatClusterID creates a pretty table for cluster ID. +func FormatClusterID(clusterID string) string { + buf := &bytes.Buffer{} + table := tablewriter.NewWriter(buf) + headers := []string{"Kafka Cluster ID"} + table.SetHeader(headers) + + table.SetColumnAlignment( + []int{ + tablewriter.ALIGN_LEFT, + }, + ) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + table.Append([]string{ + clusterID, + }) + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + // FormatBrokerReplicas creates a pretty table that shows how many replicas are in each // position (i.e., leader, second, third) by broker across all topics. Useful for showing // total-topic balance. diff --git a/pkg/admin/types.go b/pkg/admin/types.go index 9d25b85b..88ed16d7 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -356,6 +356,12 @@ type zkClusterID struct { ID string `json:"id"` } +type zkControllerInfo struct { + Version int `json:"version"` + BrokerID int `json:"brokerid"` + Timestamp string `json:"timestamp"` +} + type zkBrokerInfo struct { Endpoints []string `json:"endpoints"` Host string `json:"host"` diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 6d18e881..11829027 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -24,6 +24,7 @@ const ( assignmentPath = "/admin/reassign_partitions" electionPath = "/admin/preferred_replica_election" brokersPath = "/brokers/ids" + controllerPath = "/controller" topicsPath = "/brokers/topics" clusterIDPath = "/cluster/id" brokerConfigsPath = "/config/brokers" @@ -293,6 +294,28 @@ func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error) { return brokerIDs, nil } +// GetControllerID gets ID of the active controller broker +func (c *ZKAdminClient) GetControllerID( + ctx context.Context, +) (int, error) { + zkControllerInfo := zkControllerInfo{} + zkControllerPath := c.zNode(controllerPath) + + _, err := c.zkClient.GetJSON( + ctx, + zkControllerPath, + &zkControllerInfo, + ) + if err != nil { + return -1, fmt.Errorf("Error getting zookeeper path %s: %+v", + zkControllerPath, + err, + ) + } + + return zkControllerInfo.BrokerID, nil +} + // GetConnector returns the Connector instance associated with this client. func (c *ZKAdminClient) GetConnector() *Connector { return c.Connector diff --git a/pkg/admin/zkclient_test.go b/pkg/admin/zkclient_test.go index 3301a737..44e80cd1 100644 --- a/pkg/admin/zkclient_test.go +++ b/pkg/admin/zkclient_test.go @@ -15,6 +15,53 @@ import ( "github.com/stretchr/testify/require" ) +func TestZkClientControllerID(t *testing.T) { + zkConn, _, err := szk.Connect( + []string{util.TestZKAddr()}, + 5*time.Second, + ) + require.NoError(t, err) + require.NotNil(t, zkConn) + defer zkConn.Close() + + clusterName := testClusterID("clusterID") + zk.CreateNodes( + t, + zkConn, + []zk.PathTuple{ + { + Path: fmt.Sprintf("/%s", clusterName), + Obj: nil, + }, + { + Path: fmt.Sprintf("/%s/controller", clusterName), + Obj: map[string]interface{}{ + "version": 1, + "brokerid": 3, + "timestamp": "1589603217000", + }, + }, + }, + ) + + ctx := context.Background() + adminClient, err := NewZKAdminClient( + ctx, + ZKAdminClientConfig{ + ZKAddrs: []string{util.TestZKAddr()}, + ZKPrefix: clusterName, + BootstrapAddrs: []string{util.TestKafkaAddr()}, + ReadOnly: true, + }, + ) + require.NoError(t, err) + defer adminClient.Close() + + controllerID, err := adminClient.GetControllerID(ctx) + assert.NoError(t, err) + assert.Equal(t, 3, controllerID) +} + func TestZkClientGetClusterID(t *testing.T) { zkConn, _, err := szk.Connect( []string{util.TestZKAddr()}, diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 783963b8..19b41067 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -79,6 +79,34 @@ func (c *CLIRunner) GetBrokers(ctx context.Context, full bool) error { return nil } +// Get active controller broker ID +func (c *CLIRunner) GetControllerID(ctx context.Context, full bool) error { + c.startSpinner() + + brokerID, err := c.adminClient.GetControllerID(ctx) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Broker ID:\n%s", admin.FormatControllerID(brokerID)) + return nil +} + +// Get cluster ID +func (c *CLIRunner) GetClusterID(ctx context.Context, full bool) error { + c.startSpinner() + + clusterID, err := c.adminClient.GetClusterID(ctx) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Cluster ID:\n%s", admin.FormatClusterID(clusterID)) + return nil +} + // ApplyTopic does an apply run according to the spec in the argument config. func (c *CLIRunner) ApplyTopic( ctx context.Context, diff --git a/pkg/version/version.go b/pkg/version/version.go index bf486b52..6877c606 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -1,4 +1,4 @@ package version // Version is the current topicctl version. -const Version = "1.13.0" +const Version = "1.14.0"