From d1378c015256052e6090445431a24b28815e0d31 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu <105226401+ssingudasu@users.noreply.github.com> Date: Tue, 11 Jul 2023 13:49:58 -0700 Subject: [PATCH] topicctl new action: rebalance (#142) * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * topicctl new action: rebalance * README updated with subcomand rebalance information * Updated topicctl version * README updated with subcomand rebalance information * README updated with subcomand rebalance information * README updated with subcommand rebalance information --- README.md | 33 ++- cmd/topicctl/subcmd/rebalance.go | 363 +++++++++++++++++++++++++++++++ pkg/apply/apply.go | 32 +++ pkg/util/progress.go | 71 ++++++ pkg/util/strings.go | 15 +- pkg/version/version.go | 2 +- 6 files changed, 509 insertions(+), 7 deletions(-) create mode 100644 cmd/topicctl/subcmd/rebalance.go create mode 100644 pkg/util/progress.go diff --git a/README.md b/README.md index 90ee83d9..9f75c88f 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,18 @@ resource type in the cluster. Currently, the following operations are supported: | `get offsets [topic]` | Number of messages per partition along with start and end times | | `get topics` | All topics in the cluster | +#### rebalance + +``` +topicctl rebalance [flags] +``` + +The `apply` subcommand can be used with flag `--rebalance` rebalances a specified topics across a cluster. + +The `rebalance` subcommand, on the other hand, performs a rebalance for **all** the topics defined at a given topic prefix path. + +See the [rebalancing](#rebalancing) section below for more information on rebalancing. + #### repl ``` @@ -372,22 +384,33 @@ Note that these all try to achieve in-topic balance, and only vary in the case o Thus, the placements won't be significantly different in most cases. In the future, we may add pickers that allow for some in-topic imbalance, e.g. to correct a -cluster-wide broker inbalance. +cluster-wide broker imbalance. #### Rebalancing -If `apply` is run with the `--rebalance` flag set, then `topicctl` will do a full broker rebalance +If `apply` is run with the `--rebalance` flag, then `topicctl` will rebalance specified topics after the usual apply steps. This process will check the balance of the brokers for each index -position (i.e., first, second, third, etc.) in each partition and make replacements if there +position (i.e., first, second, third, etc.) for each partition and make replacements if there are any brokers that are significantly over- or under-represented. -The rebalance process can optionally remove brokers from a topic too. To use this feature, set the +The rebalance process can optionally remove brokers from a topic. To use this feature, set the `--to-remove` flag. Note that this flag has no effect unless `--rebalance` is also set. Rebalancing is not done by default on all apply runs because it can be fairly disruptive and -generally shouldn't be necessary unless the topic started off in an inbalanced state or there +generally shouldn't be necessary unless the topic started off in an imbalanced state or there has been a change in the number of brokers. +To rebalance **all** topics in a cluster, use the `rebalance` subcommand, which will perform the `apply --rebalance` +function on all qualifying topics. It will inventory all topic configs found at `--path-prefix` for a cluster +specified by `--cluster-config`. + +This subcommand will not rebalance a topic if: + +1. the topic config is inconsistent with the cluster config (name, region, environment etc...) +1. the partition count of a topic in the kafka cluster does not match the topic partition setting in the topic config +1. a topic's `retention.ms` in the kafka cluster does not match the topic's `retentionMinutes` setting in the topic config +1. a topic does not exist in the kafka cluster + ## Tool safety The `bootstrap`, `get`, `repl`, and `tail` subcommands are read-only and should never make diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go new file mode 100644 index 00000000..7da0096f --- /dev/null +++ b/cmd/topicctl/subcmd/rebalance.go @@ -0,0 +1,363 @@ +package subcmd + +import ( + "context" + "fmt" + "github.com/spf13/cobra" + "os" + "os/signal" + "path/filepath" + "strconv" + "syscall" + "time" + + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/apply" + "github.com/segmentio/topicctl/pkg/cli" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/util" + log "github.com/sirupsen/logrus" +) + +var rebalanceCmd = &cobra.Command{ + Use: "rebalance", + Short: "rebalance all topics for a given topic prefix path", + PreRunE: rebalancePreRun, + RunE: rebalanceRun, +} + +type rebalanceCmdConfig struct { + brokersToRemove []int + brokerThrottleMBsOverride int + dryRun bool + partitionBatchSizeOverride int + pathPrefix string + sleepLoopDuration time.Duration + showProgressInterval time.Duration + + shared sharedOptions +} + +var rebalanceConfig rebalanceCmdConfig + +func init() { + rebalanceCmd.Flags().IntSliceVar( + &rebalanceConfig.brokersToRemove, + "to-remove", + []int{}, + "Brokers to remove; only applies if rebalance is also set", + ) + rebalanceCmd.Flags().IntVar( + &rebalanceConfig.brokerThrottleMBsOverride, + "broker-throttle-mb", + 0, + "Broker throttle override (MB/sec)", + ) + rebalanceCmd.Flags().BoolVar( + &rebalanceConfig.dryRun, + "dry-run", + false, + "Do a dry-run", + ) + rebalanceCmd.Flags().IntVar( + &rebalanceConfig.partitionBatchSizeOverride, + "partition-batch-size", + 0, + "Partition batch size override", + ) + rebalanceCmd.Flags().StringVar( + &rebalanceConfig.pathPrefix, + "path-prefix", + os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"), + "Prefix for topic config paths", + ) + rebalanceCmd.Flags().DurationVar( + &rebalanceConfig.sleepLoopDuration, + "sleep-loop-duration", + 10*time.Second, + "Amount of time to wait between partition checks", + ) + rebalanceCmd.Flags().DurationVar( + &rebalanceConfig.showProgressInterval, + "show-progress-interval", + 0*time.Second, + "Interval of time to show progress during rebalance", + ) + + addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared) + RootCmd.AddCommand(rebalanceCmd) +} + +func rebalancePreRun(cmd *cobra.Command, args []string) error { + if rebalanceConfig.shared.clusterConfig == "" || rebalanceConfig.pathPrefix == "" { + return fmt.Errorf("Requires args --cluster-config & --path-prefix (or) env variables TOPICCTL_CLUSTER_CONFIG & TOPICCTL_APPLY_PATH_PREFIX") + } + + return nil +} + +func rebalanceRun(cmd *cobra.Command, args []string) error { + ctx := context.Background() + rebalanceCtxStruct, err := getRebalanceCtxStruct(&rebalanceConfig) + if err != nil { + return err + } + ctx = context.WithValue(ctx, "progress", rebalanceCtxStruct) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + go func() { + // for an interrupt, cancel context and exit program to end all topic rebalances + <-sigChan + cancel() + os.Exit(1) + }() + + clusterConfigPath := rebalanceConfig.shared.clusterConfig + topicConfigDir := rebalanceConfig.pathPrefix + clusterConfig, err := config.LoadClusterFile(clusterConfigPath, rebalanceConfig.shared.expandEnv) + if err != nil { + return err + } + + adminClient, err := clusterConfig.NewAdminClient(ctx, + nil, + rebalanceConfig.dryRun, + rebalanceConfig.shared.saslUsername, + rebalanceConfig.shared.saslPassword, + ) + if err != nil { + log.Fatal(err) + } + defer adminClient.Close() + + // get all topic configs from --path-prefix i.e topics folder + // we perform a recursive on the --path-prefix because there can be nested directories with + // more topics for the --cluster-config + // + // NOTE: a topic file is ignored for rebalance if + // - a file is not a valid topic yaml file + // - any topic config is not consistent with cluster config + log.Infof("Getting all topic configs from path prefix %v", topicConfigDir) + topicFiles, err := getAllFiles(topicConfigDir) + if err != nil { + return err + } + + // iterate through each topic config and initiate rebalance + topicConfigs := []config.TopicConfig{} + topicErrorDict := make(map[string]error) + for _, topicFile := range topicFiles { + // do not consider invalid topic yaml files for rebalance + topicConfigs, err = config.LoadTopicsFile(topicFile) + if err != nil { + log.Errorf("Invalid topic yaml file: %s", topicFile) + continue + } + + for _, topicConfig := range topicConfigs { + // topic config should be consistent with the cluster config + if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil { + log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) + continue + } + + log.Infof( + "Rebalancing topic %s from config file %s with cluster config %s", + topicConfig.Meta.Name, + topicFile, + clusterConfigPath, + ) + + topicErrorDict[topicConfig.Meta.Name] = nil + rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{ + TopicName: topicConfig.Meta.Name, + ClusterName: clusterConfig.Meta.Name, + ClusterEnvironment: clusterConfig.Meta.Environment, + ToRemove: rebalanceConfig.brokersToRemove, + RebalanceError: false, + } + if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { + topicErrorDict[topicConfig.Meta.Name] = err + rebalanceTopicProgressConfig.RebalanceError = true + log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err) + } + + // show topic final progress + if rebalanceCtxStruct.Enabled { + progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) + if err != nil { + log.Errorf("progress struct to string error: %+v", err) + } else { + log.Infof("Rebalance Progress: %s", progressStr) + } + } + } + } + + // audit at the end of all topic rebalances + successTopics := 0 + errorTopics := 0 + for thisTopicName, thisTopicError := range topicErrorDict { + if thisTopicError != nil { + errorTopics += 1 + log.Errorf("topic: %s rebalance failed with error: %v", thisTopicName, thisTopicError) + } else { + log.Infof("topic: %s rebalance is successful", thisTopicName) + successTopics += 1 + } + } + + // show overall rebalance summary report + if rebalanceCtxStruct.Enabled { + progressStr, err := util.StructToStr(util.RebalanceProgressConfig{ + SuccessTopics: successTopics, + ErrorTopics: errorTopics, + ClusterName: clusterConfig.Meta.Name, + ClusterEnvironment: clusterConfig.Meta.Environment, + ToRemove: rebalanceConfig.brokersToRemove, + }) + if err != nil { + log.Errorf("progress struct to string error: %+v", err) + } else { + log.Infof("Rebalance Progress: %s", progressStr) + } + } + + log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) + return nil +} + +// Check whether a topic is a candidate for action rebalance +// settings(partitions, retention time) of topic config with settings for topic in the cluster +func rebalanceTopicCheck( + topicConfig config.TopicConfig, + topicInfo admin.TopicInfo, +) error { + log.Debugf("Check topic partitions...") + if len(topicInfo.Partitions) != topicConfig.Spec.Partitions { + return fmt.Errorf("Topic partitions in kafka: %d does not match with topic config: %d", + len(topicInfo.Partitions), + topicConfig.Spec.Partitions, + ) + } + + log.Debugf("Check topic retention.ms...") + topicInfoRetentionMs := topicInfo.Config["retention.ms"] + topicConfigRetentionMs := strconv.Itoa(topicConfig.Spec.RetentionMinutes * 60000) + if topicInfoRetentionMs == "" { + topicInfoRetentionMs = strconv.Itoa(0) + } + if topicInfoRetentionMs != topicConfigRetentionMs { + return fmt.Errorf("Topic retention in kafka: %s does not match with topic config: %s", + topicInfoRetentionMs, + topicConfigRetentionMs, + ) + } + + return nil +} + +// Perform rebalance on a topic. returns error if unsuccessful +// topic will not be rebalanced if +// - partitions of a topic in kafka cluster does not match with topic partition setting in topic config +// - retention.ms of a topic in kafka cluster does not match with topic retentionMinutes setting in topic config +// +// to ensure there are no disruptions to kafka cluster +// +// NOTE: topic that is not present in kafka cluster will not be applied +func rebalanceApplyTopic( + ctx context.Context, + topicConfig config.TopicConfig, + clusterConfig config.ClusterConfig, + adminClient admin.Client, +) error { + topicConfig.SetDefaults() + topicInfo, err := adminClient.GetTopic(ctx, topicConfig.Meta.Name, true) + if err != nil { + if err == admin.ErrTopicDoesNotExist { + return fmt.Errorf("Topic: %s does not exist in Kafka cluster", topicConfig.Meta.Name) + } + return err + } + log.Debugf("topicInfo from kafka: %+v", topicInfo) + + if err := rebalanceTopicCheck(topicConfig, topicInfo); err != nil { + return err + } + + retentionDropStepDuration, err := clusterConfig.GetDefaultRetentionDropStepDuration() + if err != nil { + return err + } + + applierConfig := apply.TopicApplierConfig{ + BrokerThrottleMBsOverride: rebalanceConfig.brokerThrottleMBsOverride, + BrokersToRemove: rebalanceConfig.brokersToRemove, + ClusterConfig: clusterConfig, + DryRun: rebalanceConfig.dryRun, + PartitionBatchSizeOverride: rebalanceConfig.partitionBatchSizeOverride, + Rebalance: true, // to enforce action: rebalance + AutoContinueRebalance: true, // to continue without prompts + RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance + SkipConfirm: true, // to enforce action: rebalance + SleepLoopDuration: rebalanceConfig.sleepLoopDuration, + TopicConfig: topicConfig, + } + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) + if err := cliRunner.ApplyTopic(ctx, applierConfig); err != nil { + return err + } + + return nil +} + +// build ctx map for rebalance progress +func getRebalanceCtxStruct(rebalanceConfig *rebalanceCmdConfig) (util.RebalanceCtxStruct, error) { + rebalanceCtxStruct := util.RebalanceCtxStruct{ + Enabled: true, + Interval: rebalanceConfig.showProgressInterval, + } + + zeroDur, _ := time.ParseDuration("0s") + if rebalanceConfig.showProgressInterval == zeroDur { + rebalanceCtxStruct.Enabled = false + log.Infof("--progress-interval is 0s. Not showing progress...") + } else if rebalanceConfig.showProgressInterval < zeroDur { + return rebalanceCtxStruct, fmt.Errorf("--show-progress-interval should be > 0s") + } + + if rebalanceConfig.dryRun { + rebalanceCtxStruct.Enabled = false + log.Infof("--dry-run enabled. Not showing progress...") + return rebalanceCtxStruct, nil + } + + return rebalanceCtxStruct, nil +} + +// get all files for a given dir path +func getAllFiles(dir string) ([]string, error) { + var files []string + + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() { + files = append(files, path) + } + + return nil + }) + + if err != nil { + return nil, err + } + + return files, err +} diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 2e2fd91a..6d55048a 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -876,6 +876,29 @@ func (t *TopicApplier) updatePlacementRunner( roundLabel, ) + // at the moment show-progress option is available only with action: rebalance + showProgress := false + var stopChan chan bool + rebalanceCtxStruct, ok := ctx.Value("progress").(util.RebalanceCtxStruct) + if ok && rebalanceCtxStruct.Enabled { + stopChan = make(chan bool) + showProgress = true + + go util.ShowProgress( + ctx, + util.RebalanceRoundProgressConfig{ + CurrRound: round, + TotalRounds: numRounds, + TopicName: t.topicName, + ClusterName: t.clusterConfig.Meta.Name, + ClusterEnvironment: t.clusterConfig.Meta.Environment, + ToRemove: t.config.BrokersToRemove, + }, + rebalanceCtxStruct.Interval, + stopChan, + ) + } + err := t.updatePartitionsIteration( ctx, currDiffAssignments[i:end], @@ -884,6 +907,10 @@ func (t *TopicApplier) updatePlacementRunner( roundLabel, ) if err != nil { + // error handler. stop showing progress for this iteration + if showProgress { + stopChan <- true + } return err } @@ -895,6 +922,11 @@ func (t *TopicApplier) updatePlacementRunner( return errors.New("Stopping because of user response") } } + + // stop showing progress for this iteration + if showProgress { + stopChan <- true + } } topicInfo, err := t.adminClient.GetTopic(ctx, t.topicName, true) diff --git a/pkg/util/progress.go b/pkg/util/progress.go new file mode 100644 index 00000000..ee559fc2 --- /dev/null +++ b/pkg/util/progress.go @@ -0,0 +1,71 @@ +package util + +import ( + "context" + log "github.com/sirupsen/logrus" + "time" +) + +// Rebalance topic progress Config +type RebalanceTopicProgressConfig struct { + TopicName string `json:"topic"` + ClusterName string `json:"cluster"` + ClusterEnvironment string `json:"environment"` + ToRemove []int `json:"to_remove"` + RebalanceError bool `json:"rebalance_error"` +} + +// Rebalance overall progress Config +type RebalanceProgressConfig struct { + SuccessTopics int `json:"success_topics"` + ErrorTopics int `json:"error_topics"` + ClusterName string `json:"cluster"` + ClusterEnvironment string `json:"environment"` + ToRemove []int `json:"to_remove"` +} + +// Rebalance Topic Round progress Config +type RebalanceRoundProgressConfig struct { + TopicName string `json:"topic"` + ClusterName string `json:"cluster"` + ClusterEnvironment string `json:"environment"` + ToRemove []int `json:"to_remove"` + CurrRound int `json:"round"` + TotalRounds int `json:"total_rounds"` +} + +// Rebalance context struct +type RebalanceCtxStruct struct { + Enabled bool `json:"enabled"` + Interval time.Duration `json:"interval"` +} + +// shows progress of a config repeatedly during an interval +func ShowProgress( + ctx context.Context, + progressConfig interface{}, + interval time.Duration, + stopChan chan bool, +) { + progressStr, err := StructToStr(progressConfig) + if err != nil { + log.Errorf("progress struct to string error: %+v", err) + } else { + // print first before ticker starts + log.Infof("Rebalance Progress: %s", progressStr) + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err == nil { + log.Infof("Rebalance Progress: %s", progressStr) + } + case <-stopChan: + return + } + } +} diff --git a/pkg/util/strings.go b/pkg/util/strings.go index 36aca352..00a323df 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -1,6 +1,9 @@ package util -import "fmt" +import ( + "encoding/json" + "fmt" +) // TruncateStringSuffix truncates a string by replacing the trailing characters with // "..." if needed. @@ -26,3 +29,13 @@ func TruncateStringMiddle(input string, maxLen int, suffixLen int) (string, int) numOmitted := len(input) - len(prefix) - len(suffix) return fmt.Sprintf("%s...%s", prefix, suffix), numOmitted } + +// Convert any struct to json string +func StructToStr(inputStruct interface{}) (string, error) { + jsonBytes, err := json.Marshal(inputStruct) + if err != nil { + return "{}", err + } + + return string(jsonBytes), nil +} diff --git a/pkg/version/version.go b/pkg/version/version.go index c080775a..97ac83b0 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -1,4 +1,4 @@ package version // Version is the current topicctl version. -const Version = "1.9.1" +const Version = "1.10.1"