From 01c619c53b8554374e9be83c3384f31e28cf43ee Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu <105226401+ssingudasu@users.noreply.github.com> Date: Mon, 9 Oct 2023 21:19:55 +0530 Subject: [PATCH] [DP-1774] - topicctl get partitions to display under replicated and offline (#155) * [DP-1774] - topicctl get partitions to display under replicated and offline * [DP-1774] - topicctl get partitions to display under replicated and offline * [DP-1774] - topicctl get partitions to display under replicated and offline * [DP-1774] - topicctl get partitions to display under replicated and offline * MInor code fixtures * Minor fixtures. Modifying get metadata from all topics to nil * Minor fixtures. Modifying get metadata from all topics to nil --- README.md | 3 +- cmd/topicctl/subcmd/get.go | 46 +++++- pkg/admin/brokerclient.go | 17 +++ pkg/admin/client.go | 3 + pkg/admin/format.go | 199 +++++++++++++++++++++++++ pkg/admin/types.go | 294 +++++++++++++++++++++++++++++++++++++ pkg/admin/zkclient.go | 17 +++ pkg/cli/cli.go | 44 +++++- pkg/cli/repl.go | 26 +++- 9 files changed, 634 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 4ae37db1..c70888c0 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,7 @@ topicctl repl --cluster-config=examples/local-cluster/cluster.yaml ``` get brokers get topics +get partitions get partitions topic-default get offsets topic-default tail topic-default @@ -171,7 +172,7 @@ resource type in the cluster. Currently, the following operations are supported: | `get groups` | All consumer groups in the cluster | | `get lags [topic] [group]` | Lag for each topic partition for a consumer group | | `get members [group]` | Details of each member in a consumer group | -| `get partitions [topic]` | All partitions in a topic | +| `get partitions [optional: topics]` | Get all partitions for topics | | `get offsets [topic]` | Number of messages per partition along with start and end times | | `get topics` | All topics in the cluster | diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index 642a9031..feaba97b 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -2,9 +2,11 @@ package subcmd import ( "context" + "fmt" "strings" "github.com/aws/aws-sdk-go/aws/session" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/cli" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -31,6 +33,15 @@ type getCmdConfig struct { var getConfig getCmdConfig +type partitionsCmdConfig struct { + status admin.PartitionStatus + summary bool +} + +var partitionsConfig partitionsCmdConfig + +var partitionsStatusHelpText = "Allowed values: ok, offline, under-replicated" + func init() { getCmd.PersistentFlags().BoolVar( &getConfig.full, @@ -211,10 +222,10 @@ func membersCmd() *cobra.Command { } func partitionsCmd() *cobra.Command { - return &cobra.Command{ - Use: "partitions [topic]", - Short: "Displays partition information for the specified topic.", - Args: cobra.ExactArgs(1), + partitionsCommand := &cobra.Command{ + Use: "partitions [optional: topics]", + Short: "Get all partitions information for topics", + Args: cobra.MinimumNArgs(0), RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() sess := session.Must(session.NewSession()) @@ -225,10 +236,35 @@ func partitionsCmd() *cobra.Command { } defer adminClient.Close() + topics := []string{} + for _, arg := range args { + topics = append(topics, arg) + } + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) - return cliRunner.GetPartitions(ctx, args[0]) + return cliRunner.GetPartitions( + ctx, + topics, + partitionsConfig.status, + partitionsConfig.summary, + ) }, } + + partitionsCommand.Flags().Var( + &partitionsConfig.status, + "status", + fmt.Sprintf("partition status\n%s", partitionsStatusHelpText), + ) + + partitionsCommand.Flags().BoolVar( + &partitionsConfig.summary, + "summary", + false, + fmt.Sprintf("Display summary of partitions"), + ) + + return partitionsCommand } func offsetsCmd() *cobra.Command { diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 4bc4063b..8cbaeb8b 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -690,3 +690,20 @@ func configEntriesToAPIConfigs( return apiConfigs } + +func (c *BrokerAdminClient) GetAllTopicsMetadata( + ctx context.Context, +) (*kafka.MetadataResponse, error) { + client := c.GetConnector().KafkaClient + req := kafka.MetadataRequest{ + Topics: nil, + } + + log.Debugf("Metadata request: %+v", req) + metadata, err := client.Metadata(ctx, &req) + if err != nil { + return nil, fmt.Errorf("Error fetching all topics metadata: %+v", err) + } + + return metadata, nil +} diff --git a/pkg/admin/client.go b/pkg/admin/client.go index 733a9ab0..d5ae81d5 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -38,6 +38,9 @@ type Client interface { detailed bool, ) (TopicInfo, error) + // GetAllTopicsMetadata performs kafka-go metadata call to get topic information + GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error) + // UpdateTopicConfig updates the configuration for the argument topic. It returns the config // keys that were updated. UpdateTopicConfig( diff --git a/pkg/admin/format.go b/pkg/admin/format.go index c6b9c64f..d809b01c 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -504,6 +504,205 @@ func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) str return string(bytes.TrimRight(buf.Bytes(), "\n")) } +// FormatTopicsPartitionsSummary creates a pretty table with summary of the +// partitions for topics. +func FormatTopicsPartitionsSummary( + topicsPartitionsStatusSummary map[string]map[PartitionStatus][]int, +) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Topic", + "Status", + "Count", + "IDs", + } + columnAligment := []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(true) + table.SetColumnAlignment(columnAligment) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + topicNames := []string{} + tableData := make(map[string][][]string) + for topicName, partitionsStatusSummary := range topicsPartitionsStatusSummary { + topicTableRows := [][]string{} + + for partitionStatus, partitionStatusIDs := range partitionsStatusSummary { + topicTableRows = append(topicTableRows, []string{ + fmt.Sprintf("%s", topicName), + fmt.Sprintf("%s", partitionStatus), + fmt.Sprintf("%d", len(partitionStatusIDs)), + fmt.Sprintf("%+v", partitionStatusIDs), + }) + } + + // sort the topicTableRows by partitionStatus + statusSort := func(i, j int) bool { + // second element in the row is of type PartitionStatus + return string(topicTableRows[i][1]) < string(topicTableRows[j][1]) + } + + sort.Slice(topicTableRows, statusSort) + + tableData[topicName] = topicTableRows + topicNames = append(topicNames, topicName) + } + + sort.Strings(topicNames) + for _, topicName := range topicNames { + _, exists := tableData[topicName] + if exists { + for _, topicTableRow := range tableData[topicName] { + table.Append(topicTableRow) + } + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + +// FormatTopicsPartitions creates a pretty table with information on all of the +// partitions for topics. +func FormatTopicsPartitions( + topicsPartitionsStatusInfo map[string][]PartitionStatusInfo, + brokers []BrokerInfo, +) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Topic", + "ID", + "Leader", + "ISR", + "Replicas", + "Distinct\nRacks", + "Racks", + "Status", + } + columnAligment := []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(false) + table.SetColumnAlignment(columnAligment) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + topicNames := []string{} + brokerRacks := BrokerRacks(brokers) + tableData := make(map[string][][]string) + for topicName, partitionsStatusInfo := range topicsPartitionsStatusInfo { + topicTableRows := [][]string{} + for _, partitionStatusInfo := range partitionsStatusInfo { + racks := partitionStatusInfo.Racks(brokerRacks) + + distinctRacks := make(map[string]int) + for _, rack := range racks { + distinctRacks[rack] += 1 + } + + partitionIsrs := []int{} + for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr { + partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID) + } + + partitionReplicas := []int{} + for _, partitionReplica := range partitionStatusInfo.Partition.Replicas { + partitionReplicas = append(partitionReplicas, partitionReplica.ID) + } + + inSync := true + if partitionStatusInfo.Status != Ok { + inSync = false + } + + correctLeader := true + if partitionStatusInfo.LeaderState != CorrectLeader { + correctLeader = false + } + + var statusPrinter func(f string, a ...interface{}) string + if !util.InTerminal() || inSync { + statusPrinter = fmt.Sprintf + } else if !inSync { + statusPrinter = color.New(color.FgRed).SprintfFunc() + } + + var statePrinter func(f string, a ...interface{}) string + if !util.InTerminal() || correctLeader { + statePrinter = fmt.Sprintf + } else if !correctLeader { + statePrinter = color.New(color.FgCyan).SprintfFunc() + } + + leaderStateString := fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID) + if !correctLeader { + leaderStateString = fmt.Sprintf("%d %+v", partitionStatusInfo.Partition.Leader.ID, + statePrinter("(%s)", string(partitionStatusInfo.LeaderState)), + ) + } + + topicTableRows = append(topicTableRows, []string{ + fmt.Sprintf("%s", topicName), + fmt.Sprintf("%d", partitionStatusInfo.Partition.ID), + leaderStateString, + fmt.Sprintf("%+v", partitionIsrs), + fmt.Sprintf("%+v", partitionReplicas), + fmt.Sprintf("%d", len(distinctRacks)), + fmt.Sprintf("%+v", racks), + fmt.Sprintf("%v", statusPrinter("%s", string(partitionStatusInfo.Status))), + }) + } + + tableData[topicName] = topicTableRows + topicNames = append(topicNames, topicName) + } + + sort.Strings(topicNames) + for _, topicName := range topicNames { + _, exists := tableData[topicName] + if exists { + for _, topicTableRow := range tableData[topicName] { + table.Append(topicTableRow) + } + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + // FormatConfig creates a pretty table with all of the keys and values in a topic or // broker config. func FormatConfig(configMap map[string]string) string { diff --git a/pkg/admin/types.go b/pkg/admin/types.go index 0221a39b..eb39517c 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -6,9 +6,12 @@ import ( "reflect" "sort" "strconv" + "strings" "time" + "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/util" + log "github.com/sirupsen/logrus" ) const ( @@ -136,6 +139,32 @@ type zkChangeNotification struct { EntityPath string `json:"entity_path"` } +type PartitionStatus string + +const ( + Ok PartitionStatus = "OK" + Offline PartitionStatus = "Offline" + UnderReplicated PartitionStatus = "Under-replicated" +) + +type PartitionLeaderState string + +const ( + CorrectLeader PartitionLeaderState = "OK" + WrongLeader PartitionLeaderState = "Wrong" +) + +type PartitionStatusInfo struct { + Topic string + Partition kafka.Partition + Status PartitionStatus + LeaderState PartitionLeaderState +} + +const ( + ListenerNotFoundError kafka.Error = 72 +) + // Addr returns the address of the current BrokerInfo. func (b BrokerInfo) Addr() string { return fmt.Sprintf("%s:%d", b.Host, b.Port) @@ -457,6 +486,20 @@ func (p PartitionInfo) Racks(brokerRacks map[int]string) ([]string, error) { return racks, nil } +// Racks returns a slice of all racks for the partition replicas. +func (p PartitionStatusInfo) Racks(brokerRacks map[int]string) []string { + racks := []string{} + + for _, replica := range p.Partition.Replicas { + rack, ok := brokerRacks[replica.ID] + if ok { + racks = append(racks, rack) + } + } + + return racks +} + // NumRacks returns the number of distinct racks in the partition. func (p PartitionInfo) NumRacks(brokerRacks map[int]string) (int, error) { racksMap := map[string]struct{}{} @@ -792,3 +835,254 @@ func NewLeaderPartitions( return newLeaderPartitions } + +// Check if a string is valid PartitionStatus type +func StringToPartitionStatus(status string) (PartitionStatus, bool) { + switch strings.ToLower(status) { + case strings.ToLower(string(Ok)): + return Ok, true + case strings.ToLower(string(Offline)): + return Offline, true + case strings.ToLower(string(UnderReplicated)): + return UnderReplicated, true + default: + return PartitionStatus(status), false + } +} + +// Set is used by Cobra to set the value of a variable from a Cobra flag. +func (p *PartitionStatus) Set(v string) error { + ps, ok := StringToPartitionStatus(v) + if !ok { + return errors.New("Allowed values: ok, offline, under-replicated") + } + *p = ps + return nil +} + +// String is used by Cobra in help text. +func (p *PartitionStatus) String() string { + return string(*p) +} + +// Type is used by Cobra in help text. +func (p *PartitionStatus) Type() string { + return "PartitionStatus" +} + +// Get the partition status info for specified topics +func GetTopicsPartitionsStatusInfo( + metadata *kafka.MetadataResponse, + topics []string, + status PartitionStatus, +) map[string][]PartitionStatusInfo { + topicsPartitionsStatusInfo := make(map[string][]PartitionStatusInfo) + + filterTopics := GetValidTopicNamesFromMetadata(topics, metadata) + for _, topicMetadata := range metadata.Topics { + if topicMetadata.Error != nil { + log.Errorf("Topic: %s metadata error: %v", topicMetadata.Name, topicMetadata.Error) + continue + } + + if len(topics) != 0 && !filterTopics[topicMetadata.Name] { + continue + } + + partitionsStatusInfo := []PartitionStatusInfo{} + for _, partition := range topicMetadata.Partitions { + partitionStatus := GetPartitionStatus(partition) + log.Debugf("Topic: %s, Partition: %d status is: %v", + topicMetadata.Name, + partition.ID, + partitionStatus, + ) + + // kafka-go metadata call fetches the partition broker ID as 0 for partitions + // that are not found + // + // i.e + // - if a replica is missing for a partition, we get broker id as 0 + // - if a isr is missing for a partition, we still get broker id as 0 + // - if a leader is missing for a partition, we still get broker id as 0 (this is offline partition) + // + // It can be confusing to kafka-go users since broker IDs can start from 0 + // + // However, Burrow metadata call fetches missing partitions broker ID as -1 + // + // For user readability, we will modify + // - any ISR broker ID that does not have valid Host or Port from 0 to -1 + // - any Replica broker ID that does not have valid Host or Port from 0 to -1 + // - (Offline) Leader Broker ID that does not have a valid Host or Port from 0 to -1 + // + switch partitionStatus { + case Ok, UnderReplicated, Offline: + if partitionStatus != Ok { + if partitionStatus == Offline { + partition.Leader.ID = -1 + } + + for i, _ := range partition.Isr { + if partition.Isr[i].Host == "" && partition.Isr[i].Port == 0 { + partition.Isr[i].ID = -1 + } + } + + for i, _ := range partition.Replicas { + if partition.Replicas[i].Host == "" && partition.Replicas[i].Port == 0 { + partition.Replicas[i].ID = -1 + } + } + } + + leaderState := WrongLeader + // check if preferred replica leader is the first valid replica ID + firstReplicaID := -1 + for _, replica := range partition.Replicas { + if replica.ID == -1 { + continue + } + + firstReplicaID = replica.ID + break + } + + if len(partition.Replicas) > 0 && + partitionStatus != Offline && + partition.Leader.ID == firstReplicaID { + leaderState = CorrectLeader + } + + if status == PartitionStatus("") || status == partitionStatus { + partitionsStatusInfo = append(partitionsStatusInfo, PartitionStatusInfo{ + Topic: topicMetadata.Name, + Partition: partition, + Status: partitionStatus, + LeaderState: leaderState, + }) + } + default: + log.Errorf("Unrecognized partition status: %v", partitionStatus) + } + } + + if len(partitionsStatusInfo) > 0 { + topicsPartitionsStatusInfo[topicMetadata.Name] = partitionsStatusInfo + } + } + + return topicsPartitionsStatusInfo +} + +// Get the partition status summary +func GetTopicsPartitionsStatusSummary( + metadata *kafka.MetadataResponse, + topics []string, + status PartitionStatus, +) (map[string]map[PartitionStatus][]int, int, int, int) { + okCount := 0 + offlineCount := 0 + underReplicatedCount := 0 + topicsPartitionsStatusSummary := make(map[string]map[PartitionStatus][]int) + + filterTopics := GetValidTopicNamesFromMetadata(topics, metadata) + for _, topicMetadata := range metadata.Topics { + + if topicMetadata.Error != nil { + log.Errorf("Topic: %s metadata error: %v", topicMetadata.Name, topicMetadata.Error) + continue + } + + if len(topics) != 0 && !filterTopics[topicMetadata.Name] { + continue + } + + _, exists := topicsPartitionsStatusSummary[topicMetadata.Name] + if !exists { + topicsPartitionsStatusSummary[topicMetadata.Name] = make(map[PartitionStatus][]int) + + if status == "" { + topicsPartitionsStatusSummary[topicMetadata.Name][Ok] = []int{} + topicsPartitionsStatusSummary[topicMetadata.Name][UnderReplicated] = []int{} + topicsPartitionsStatusSummary[topicMetadata.Name][Offline] = []int{} + } else { + topicsPartitionsStatusSummary[topicMetadata.Name][status] = []int{} + } + } + + for _, partition := range topicMetadata.Partitions { + partitionStatus := GetPartitionStatus(partition) + + if status == PartitionStatus("") || status == partitionStatus { + switch partitionStatus { + case Ok: + okCount += 1 + case Offline: + offlineCount += 1 + case UnderReplicated: + underReplicatedCount += 1 + default: + // unrecognized partition status + } + + topicsPartitionsStatusSummary[topicMetadata.Name][partitionStatus] = append( + topicsPartitionsStatusSummary[topicMetadata.Name][partitionStatus], + partition.ID, + ) + } + } + } + + return topicsPartitionsStatusSummary, okCount, offlineCount, underReplicatedCount +} + +// Get the Partition Status +// - ok +// - offline +// - under-replicated +// +// NOTE: partition is +// 1. offline - if ListenerNotFound Error observed for leader partition +// 2. underreplicated - if number of isrs are lesser than the replicas +func GetPartitionStatus(partition kafka.Partition) PartitionStatus { + if partition.Leader.Host == "" && partition.Leader.Port == 0 && + ListenerNotFoundError.Error() == partition.Error.Error() { + return Offline + } else if len(partition.Isr) < len(partition.Replicas) { + return UnderReplicated + } + + return Ok +} + +// given an input of topics, returns topics that exist in the cluster +func GetValidTopicNamesFromMetadata( + topics []string, + metadata *kafka.MetadataResponse, +) map[string]bool { + validTopics := make(map[string]bool) + allTopicNamesSet := GetAllTopicNamesFromMetadata(metadata) + + for _, topic := range topics { + _, exists := allTopicNamesSet[topic] + if exists { + validTopics[topic] = true + } else { + log.Errorf("Ignoring topic: %s. Not found in the kafka cluster", topic) + } + } + + return validTopics +} + +func GetAllTopicNamesFromMetadata( + metadata *kafka.MetadataResponse, +) map[string]bool { + topicsSet := make(map[string]bool) + + for _, topicMetadata := range metadata.Topics { + topicsSet[topicMetadata.Name] = true + } + + return topicsSet +} diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 575dee8e..93cd1407 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -1076,3 +1076,20 @@ func updateConfig( configMap["config"] = configKVMap return updatedKeys, nil } + +func (c *ZKAdminClient) GetAllTopicsMetadata( + ctx context.Context, +) (*kafka.MetadataResponse, error) { + client := c.GetConnector().KafkaClient + req := kafka.MetadataRequest{ + Topics: nil, + } + + log.Debugf("Metadata request: %+v", req) + metadata, err := client.Metadata(ctx, &req) + if err != nil { + return nil, fmt.Errorf("Error fetching all topics metadata: %+v", err) + } + + return metadata, nil +} diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 7b0542c7..342d7335 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -400,25 +400,57 @@ func (c *CLIRunner) GetMemberLags( // GetPartitions fetches the details of each partition in a topic and prints out a summary for // user inspection. -func (c *CLIRunner) GetPartitions(ctx context.Context, topic string) error { +func (c *CLIRunner) GetPartitions( + ctx context.Context, + topics []string, + status admin.PartitionStatus, + summary bool, +) error { c.startSpinner() - topicInfo, err := c.adminClient.GetTopic(ctx, topic, true) + metadata, err := c.adminClient.GetAllTopicsMetadata(ctx) if err != nil { c.stopSpinner() return err } brokers, err := c.adminClient.GetBrokers(ctx, nil) - c.stopSpinner() if err != nil { + c.stopSpinner() return err } + if !summary { + topicsPartitionsStatusInfo := admin.GetTopicsPartitionsStatusInfo(metadata, topics, status) + c.stopSpinner() + + c.printer( + "Partitions:\n%s", + admin.FormatTopicsPartitions(topicsPartitionsStatusInfo, brokers), + ) + + return nil + } + + statusSummary, okCount, offlineCount, underReplicatedCount := admin.GetTopicsPartitionsStatusSummary(metadata, + topics, + status, + ) + c.stopSpinner() + c.printer( - "Partitions for topic %s:\n%s", - topic, - admin.FormatTopicPartitions(topicInfo.Partitions, brokers), + "Partitions Summary:\n%s", + admin.FormatTopicsPartitionsSummary(statusSummary), + ) + + c.printer( + "%d %v partitions, %d %v partitions, %d %v partitions are found", + okCount, + admin.Ok, + underReplicatedCount, + admin.UnderReplicated, + offlineCount, + admin.Offline, ) return nil diff --git a/pkg/cli/repl.go b/pkg/cli/repl.go index 77096191..da053b63 100644 --- a/pkg/cli/repl.go +++ b/pkg/cli/repl.go @@ -300,11 +300,31 @@ func (r *Repl) executor(in string) { return } case "partitions": - if err := command.checkArgs(3, 3, nil); err != nil { + // + // from terminal, topicctl get partitions can take more than one argument + // + // from repl, filtering multiple topics can get tricky. + // current repl implementation takes only fixed number of words (command.args) + // hence in repl, we will make get partitions work with only + // one argument (topic) and PartitionStatus as "" implying all status + // + // repl get partitions expect minimum of 3 arguments and maximum of 4 + // repl> get partitions -> this works + // repl> get partitions --summary -> this works + // repl> get partitions -> this works only for + // repl> get partitions --summary -> this will not work + // + if err := command.checkArgs(3, 4, map[string]struct{}{"summary": {}}); err != nil { log.Errorf("Error: %+v", err) return } - if err := r.cliRunner.GetPartitions(ctx, command.args[2]); err != nil { + + if err := r.cliRunner.GetPartitions( + ctx, + []string{command.args[2]}, + admin.PartitionStatus(""), + command.getBoolValue("summary"), + ); err != nil { log.Errorf("Error: %+v", err) return } @@ -457,7 +477,7 @@ func helpTable() string { "Get the members of a consumer group", }, { - " get partitions [topic]", + " get partitions [topic] [--summary]", "Get all partitions for a topic", }, {