Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DP-1774] - topicctl get partitions to display under replicated and offline #155

Merged
merged 7 commits into from
Oct 9, 2023
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ topicctl repl --cluster-config=examples/local-cluster/cluster.yaml
```
get brokers
get topics
get partitions
get partitions topic-default
get offsets topic-default
tail topic-default
Expand Down Expand Up @@ -171,7 +172,7 @@ resource type in the cluster. Currently, the following operations are supported:
| `get groups` | All consumer groups in the cluster |
| `get lags [topic] [group]` | Lag for each topic partition for a consumer group |
| `get members [group]` | Details of each member in a consumer group |
| `get partitions [topic]` | All partitions in a topic |
| `get partitions [optional: topics]` | Get all partitions for topics |
| `get offsets [topic]` | Number of messages per partition along with start and end times |
| `get topics` | All topics in the cluster |

Expand Down
46 changes: 41 additions & 5 deletions cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package subcmd

import (
"context"
"fmt"
"strings"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/cli"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -31,6 +33,15 @@ type getCmdConfig struct {

var getConfig getCmdConfig

type partitionsCmdConfig struct {
status admin.PartitionStatus
summary bool
}

var partitionsConfig partitionsCmdConfig

var partitionsStatusHelpText = "Allowed values: ok, offline, under-replicated"

func init() {
getCmd.PersistentFlags().BoolVar(
&getConfig.full,
Expand Down Expand Up @@ -211,10 +222,10 @@ func membersCmd() *cobra.Command {
}

func partitionsCmd() *cobra.Command {
return &cobra.Command{
Use: "partitions [topic]",
Short: "Displays partition information for the specified topic.",
Args: cobra.ExactArgs(1),
partitionsCommand := &cobra.Command{
Use: "partitions [optional: topics]",
Short: "Get all partitions information for topics",
Args: cobra.MinimumNArgs(0),
ssingudasu marked this conversation as resolved.
Show resolved Hide resolved
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())
Expand All @@ -225,10 +236,35 @@ func partitionsCmd() *cobra.Command {
}
defer adminClient.Close()

topics := []string{}
for _, arg := range args {
topics = append(topics, arg)
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetPartitions(ctx, args[0])
return cliRunner.GetPartitions(
ctx,
topics,
partitionsConfig.status,
partitionsConfig.summary,
)
},
}

partitionsCommand.Flags().Var(
&partitionsConfig.status,
"status",
fmt.Sprintf("partition status\n%s", partitionsStatusHelpText),
)

partitionsCommand.Flags().BoolVar(
&partitionsConfig.summary,
"summary",
false,
fmt.Sprintf("Display summary of partitions"),
)

return partitionsCommand
}

func offsetsCmd() *cobra.Command {
Expand Down
17 changes: 17 additions & 0 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,3 +690,20 @@ func configEntriesToAPIConfigs(

return apiConfigs
}

func (c *BrokerAdminClient) GetAllTopicsMetadata(
ctx context.Context,
) (*kafka.MetadataResponse, error) {
client := c.GetConnector().KafkaClient
req := kafka.MetadataRequest{
Topics: nil,
}

log.Debugf("Metadata request: %+v", req)
metadata, err := client.Metadata(ctx, &req)
if err != nil {
return nil, fmt.Errorf("Error fetching all topics metadata: %+v", err)
}

return metadata, nil
}
3 changes: 3 additions & 0 deletions pkg/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Client interface {
detailed bool,
) (TopicInfo, error)

// GetAllTopicsMetadata performs kafka-go metadata call to get topic information
GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error)

// UpdateTopicConfig updates the configuration for the argument topic. It returns the config
// keys that were updated.
UpdateTopicConfig(
Expand Down
199 changes: 199 additions & 0 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,205 @@ func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) str
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatTopicsPartitionsSummary creates a pretty table with summary of the
// partitions for topics.
func FormatTopicsPartitionsSummary(
topicsPartitionsStatusSummary map[string]map[PartitionStatus][]int,
) string {
buf := &bytes.Buffer{}

headers := []string{
"Topic",
"Status",
"Count",
"IDs",
}
columnAligment := []int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
}

table := tablewriter.NewWriter(buf)
table.SetHeader(headers)
table.SetAutoWrapText(true)
table.SetColumnAlignment(columnAligment)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

topicNames := []string{}
tableData := make(map[string][][]string)
for topicName, partitionsStatusSummary := range topicsPartitionsStatusSummary {
topicTableRows := [][]string{}

for partitionStatus, partitionStatusIDs := range partitionsStatusSummary {
topicTableRows = append(topicTableRows, []string{
fmt.Sprintf("%s", topicName),
fmt.Sprintf("%s", partitionStatus),
fmt.Sprintf("%d", len(partitionStatusIDs)),
fmt.Sprintf("%+v", partitionStatusIDs),
})
}

// sort the topicTableRows by partitionStatus
statusSort := func(i, j int) bool {
// second element in the row is of type PartitionStatus
return string(topicTableRows[i][1]) < string(topicTableRows[j][1])
}

sort.Slice(topicTableRows, statusSort)

tableData[topicName] = topicTableRows
topicNames = append(topicNames, topicName)
}

sort.Strings(topicNames)
for _, topicName := range topicNames {
_, exists := tableData[topicName]
if exists {
for _, topicTableRow := range tableData[topicName] {
table.Append(topicTableRow)
}
}
}

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatTopicsPartitions creates a pretty table with information on all of the
// partitions for topics.
func FormatTopicsPartitions(
topicsPartitionsStatusInfo map[string][]PartitionStatusInfo,
brokers []BrokerInfo,
) string {
buf := &bytes.Buffer{}

headers := []string{
"Topic",
"ID",
"Leader",
"ISR",
"Replicas",
"Distinct\nRacks",
"Racks",
"Status",
}
columnAligment := []int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
}

table := tablewriter.NewWriter(buf)
table.SetHeader(headers)
table.SetAutoWrapText(false)
table.SetColumnAlignment(columnAligment)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

topicNames := []string{}
brokerRacks := BrokerRacks(brokers)
tableData := make(map[string][][]string)
for topicName, partitionsStatusInfo := range topicsPartitionsStatusInfo {
topicTableRows := [][]string{}
for _, partitionStatusInfo := range partitionsStatusInfo {
racks := partitionStatusInfo.Racks(brokerRacks)

distinctRacks := make(map[string]int)
for _, rack := range racks {
distinctRacks[rack] += 1
}

partitionIsrs := []int{}
for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr {
partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID)
}

partitionReplicas := []int{}
for _, partitionReplica := range partitionStatusInfo.Partition.Replicas {
partitionReplicas = append(partitionReplicas, partitionReplica.ID)
}

inSync := true
if partitionStatusInfo.Status != Ok {
inSync = false
}

correctLeader := true
if partitionStatusInfo.LeaderState != CorrectLeader {
correctLeader = false
}

var statusPrinter func(f string, a ...interface{}) string
if !util.InTerminal() || inSync {
statusPrinter = fmt.Sprintf
} else if !inSync {
statusPrinter = color.New(color.FgRed).SprintfFunc()
}

var statePrinter func(f string, a ...interface{}) string
if !util.InTerminal() || correctLeader {
statePrinter = fmt.Sprintf
} else if !correctLeader {
statePrinter = color.New(color.FgCyan).SprintfFunc()
}

leaderStateString := fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID)
if !correctLeader {
leaderStateString = fmt.Sprintf("%d %+v", partitionStatusInfo.Partition.Leader.ID,
statePrinter("(%s)", string(partitionStatusInfo.LeaderState)),
)
}

topicTableRows = append(topicTableRows, []string{
fmt.Sprintf("%s", topicName),
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
leaderStateString,
fmt.Sprintf("%+v", partitionIsrs),
fmt.Sprintf("%+v", partitionReplicas),
fmt.Sprintf("%d", len(distinctRacks)),
fmt.Sprintf("%+v", racks),
fmt.Sprintf("%v", statusPrinter("%s", string(partitionStatusInfo.Status))),
})
}

tableData[topicName] = topicTableRows
topicNames = append(topicNames, topicName)
}

sort.Strings(topicNames)
for _, topicName := range topicNames {
_, exists := tableData[topicName]
if exists {
for _, topicTableRow := range tableData[topicName] {
table.Append(topicTableRow)
}
}
}

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatConfig creates a pretty table with all of the keys and values in a topic or
// broker config.
func FormatConfig(configMap map[string]string) string {
Expand Down
Loading
Loading