From 7cc808dd85483160c7806788ce083263576b80fc Mon Sep 17 00:00:00 2001 From: David Teutli Date: Mon, 1 Dec 2025 11:56:07 -0700 Subject: [PATCH 1/7] feat(tube-3309): add initial groundwork for rebalancing of topics with no config files --- cmd/topicctl/subcmd/rebalance.go | 75 ++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 2654c0d1..c3024293 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -7,6 +7,7 @@ import ( "os/signal" "path/filepath" "strconv" + "strings" "syscall" "time" @@ -32,6 +33,7 @@ type rebalanceCmdConfig struct { brokersToRemove []int brokerThrottleMBsOverride int dryRun bool + generateConfig bool partitionBatchSizeOverride int pathPrefix string sleepLoopDuration time.Duration @@ -85,6 +87,12 @@ func init() { 0*time.Second, "Interval of time to show progress during rebalance", ) + rebalanceCmd.Flags().BoolVar( + &rebalanceConfig.generateConfig, + "generate-config", + false, + "Generate temporary config file(s) for the rebalance of configless topic(s)", + ) addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared) RootCmd.AddCommand(rebalanceCmd) @@ -151,10 +159,69 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { return err } + tmpDir := "" + existingConfigFiles := make(map[string]struct{}) + if rebalanceConfig.generateConfig { + // 0) Make set of existing topic files + for _, topicFile := range topicFiles { + _, topicName := filepath.Split(topicFile) + existingConfigFiles[topicName] = struct{}{} + log.Infof("Topic file: %v", topicName) + } + + // 1) Create tmp directory for generated config files + tmpDir = rebalanceConfig.pathPrefix + if rebalanceConfig.pathPrefix[len(rebalanceConfig.pathPrefix) - 1] != '/' { + tmpDir += "/" + } + tmpDir += "tmp/" + err = os.Mkdir(tmpDir, 0755) + if err != nil { + // it will error if tmp dir already exists + return err + } + log.Infof("tmp output path: %v", tmpDir) + + // 2) Bootstrap topics + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) + cliRunner.BootstrapTopics( + ctx, + []string{}, // args + clusterConfig, + ".*", // match + ".^", // exclude + tmpDir, // output dir + false, // overwrite + false, // allow internal topics + ) + + // 3) Re-invetory config files + topicFiles, err = getAllFiles(topicConfigDir) + if err != nil { + return err + } + for _, topicFile := range topicFiles { + log.Infof("Topic file: %v", topicFile) + } + } + // iterate through each topic config and initiate rebalance topicConfigs := []config.TopicConfig{} topicErrorDict := make(map[string]error) for _, topicFile := range topicFiles { + // skip generated config if it already existed + if rebalanceConfig.generateConfig { + configPath, configName := filepath.Split(topicFile) + if strings.HasSuffix(configPath, "tmp/") { + _, found := existingConfigFiles[configName] + if found { + log.Infof("skipping existing file...") + continue + } + } + } + log.Infof("processing...") + // do not consider invalid topic yaml files for rebalance topicConfigs, err = config.LoadTopicsFile(topicFile) if err != nil { @@ -231,6 +298,14 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } } + // clean up generated config files + if rebalanceConfig.generateConfig { + err = os.RemoveAll(tmpDir) + if err != nil { + return err + } + } + log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) return nil } From 0db6a82dfd64eeb590ac49048cba29df655c0374 Mon Sep 17 00:00:00 2001 From: David Teutli Date: Mon, 1 Dec 2025 18:49:20 -0700 Subject: [PATCH 2/7] feat(tube-3309): use topic name instead of filename to keep track of existing generated files; wrap topic file and config traversal in reusable function --- cmd/topicctl/subcmd/rebalance.go | 79 +++++++++++++++++++------------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index c3024293..c5b26221 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -162,14 +162,16 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { tmpDir := "" existingConfigFiles := make(map[string]struct{}) if rebalanceConfig.generateConfig { - // 0) Make set of existing topic files - for _, topicFile := range topicFiles { - _, topicName := filepath.Split(topicFile) - existingConfigFiles[topicName] = struct{}{} - log.Infof("Topic file: %v", topicName) + // make set of existing files + err := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error { + existingConfigFiles[topicConfig.Meta.Name] = struct{}{} + return nil + }) + if err != nil { + return err } - // 1) Create tmp directory for generated config files + // create temp dir tmpDir = rebalanceConfig.pathPrefix if rebalanceConfig.pathPrefix[len(rebalanceConfig.pathPrefix) - 1] != '/' { tmpDir += "/" @@ -177,51 +179,34 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { tmpDir += "tmp/" err = os.Mkdir(tmpDir, 0755) if err != nil { - // it will error if tmp dir already exists return err } log.Infof("tmp output path: %v", tmpDir) - // 2) Bootstrap topics + // generate (bootstrap) config files cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) cliRunner.BootstrapTopics( ctx, - []string{}, // args + []string{}, clusterConfig, - ".*", // match - ".^", // exclude - tmpDir, // output dir - false, // overwrite - false, // allow internal topics + ".*", + ".^", + tmpDir, + false, + false, ) - // 3) Re-invetory config files + // re-invetory config files to take into account newly generated ones topicFiles, err = getAllFiles(topicConfigDir) if err != nil { return err } - for _, topicFile := range topicFiles { - log.Infof("Topic file: %v", topicFile) - } } // iterate through each topic config and initiate rebalance topicConfigs := []config.TopicConfig{} topicErrorDict := make(map[string]error) for _, topicFile := range topicFiles { - // skip generated config if it already existed - if rebalanceConfig.generateConfig { - configPath, configName := filepath.Split(topicFile) - if strings.HasSuffix(configPath, "tmp/") { - _, found := existingConfigFiles[configName] - if found { - log.Infof("skipping existing file...") - continue - } - } - } - log.Infof("processing...") - // do not consider invalid topic yaml files for rebalance topicConfigs, err = config.LoadTopicsFile(topicFile) if err != nil { @@ -298,7 +283,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } } - // clean up generated config files + // clean up any generated config files if rebalanceConfig.generateConfig { err = os.RemoveAll(tmpDir) if err != nil { @@ -441,3 +426,33 @@ func getAllFiles(dir string) ([]string, error) { return files, err } + +func processTopicFiles(topicFiles []string, operation func(topicConfig config.TopicConfig, topicFile string) error) error { + for _, topicFile := range topicFiles { + // do not consider invalid topic yaml files for rebalance + topicConfigs, err := config.LoadTopicsFile(topicFile) + if err != nil { + log.Errorf("Invalid topic yaml file: %s", topicFile) + continue + } + + for _, topicConfig := range topicConfigs { + err := operation(topicConfig, topicFile) + if err != nil { + return fmt.Errorf("error during operationg on config %d (%s): %w", 0, topicConfig.Meta.Name, err) + } + } + } + return nil +} + +func topicConfigExists(topicFilepath string, existingFiles map[string]struct{}, name string) bool { + configPath, _ := filepath.Split(topicFilepath) + if strings.HasSuffix(configPath, "tmp/") { + _, found := existingFiles[name] + if found { + return true + } + } + return false +} From f8de11723af49b5c563c3393fcdac1a20959d6c9 Mon Sep 17 00:00:00 2001 From: David Teutli Date: Mon, 1 Dec 2025 18:52:44 -0700 Subject: [PATCH 3/7] feat(tube-3309): make use of wrapper processTopicFiles function for main process --- cmd/topicctl/subcmd/rebalance.go | 83 ++++++++++++++++---------------- 1 file changed, 41 insertions(+), 42 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index c5b26221..08427c90 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -204,54 +204,53 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } // iterate through each topic config and initiate rebalance - topicConfigs := []config.TopicConfig{} topicErrorDict := make(map[string]error) - for _, topicFile := range topicFiles { - // do not consider invalid topic yaml files for rebalance - topicConfigs, err = config.LoadTopicsFile(topicFile) - if err != nil { - log.Errorf("Invalid topic yaml file: %s", topicFile) - continue + processed := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error { + // skip generated config if it already existed + if rebalanceConfig.generateConfig && topicConfigExists(topicFile, existingConfigFiles, topicConfig.Meta.Name) { + log.Infof("skipping generated config for topic %v", topicConfig.Meta.Name) + return nil + } + + // topic config should be consistent with the cluster config + if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil { + log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) + return nil } - for _, topicConfig := range topicConfigs { - // topic config should be consistent with the cluster config - if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil { - log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) - continue - } - - log.Infof( - "Rebalancing topic %s from config file %s with cluster config %s", - topicConfig.Meta.Name, - topicFile, - clusterConfigPath, - ) - - topicErrorDict[topicConfig.Meta.Name] = nil - rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{ - TopicName: topicConfig.Meta.Name, - ClusterName: clusterConfig.Meta.Name, - ClusterEnvironment: clusterConfig.Meta.Environment, - ToRemove: rebalanceConfig.brokersToRemove, - RebalanceError: false, - } - if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { - topicErrorDict[topicConfig.Meta.Name] = err - rebalanceTopicProgressConfig.RebalanceError = true - log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err) - } + log.Infof( + "Rebalancing topic %s from config file %s with cluster config %s", + topicConfig.Meta.Name, + topicFile, + clusterConfigPath, + ) + topicErrorDict[topicConfig.Meta.Name] = nil + rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{ + TopicName: topicConfig.Meta.Name, + ClusterName: clusterConfig.Meta.Name, + ClusterEnvironment: clusterConfig.Meta.Environment, + ToRemove: rebalanceConfig.brokersToRemove, + RebalanceError: false, + } + if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { + topicErrorDict[topicConfig.Meta.Name] = err + rebalanceTopicProgressConfig.RebalanceError = true + log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err) + } - // show topic final progress - if rebalanceCtxStruct.Enabled { - progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) - if err != nil { - log.Errorf("progress struct to string error: %+v", err) - } else { - log.Infof("Rebalance Progress: %s", progressStr) - } + // show topic final progress + if rebalanceCtxStruct.Enabled { + progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) + if err != nil { + log.Errorf("progress struct to string error: %+v", err) + } else { + log.Infof("Rebalance Progress: %s", progressStr) } } + return nil + }) + if processed != nil { + return processed } // audit at the end of all topic rebalances From 67c15bb756b6df156ca4743d033fc82456aadd46 Mon Sep 17 00:00:00 2001 From: David Teutli Date: Tue, 2 Dec 2025 15:07:39 -0700 Subject: [PATCH 4/7] feat(tube-3309): update logic to skip existing config files --- cmd/topicctl/subcmd/rebalance.go | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 08427c90..5499ee1b 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -160,11 +160,11 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } tmpDir := "" - existingConfigFiles := make(map[string]struct{}) + existingConfigFiles := []string{} if rebalanceConfig.generateConfig { // make set of existing files err := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error { - existingConfigFiles[topicConfig.Meta.Name] = struct{}{} + existingConfigFiles = append(existingConfigFiles, topicConfig.Meta.Name) return nil }) if err != nil { @@ -183,14 +183,19 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } log.Infof("tmp output path: %v", tmpDir) - // generate (bootstrap) config files + topicsToExclude := ".^" + if len(existingConfigFiles) > 0 { + topicsToExclude = strings.Join(existingConfigFiles, "|") + } + + // generate missing config files cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) cliRunner.BootstrapTopics( ctx, []string{}, clusterConfig, ".*", - ".^", + topicsToExclude, tmpDir, false, false, @@ -206,12 +211,6 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { // iterate through each topic config and initiate rebalance topicErrorDict := make(map[string]error) processed := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error { - // skip generated config if it already existed - if rebalanceConfig.generateConfig && topicConfigExists(topicFile, existingConfigFiles, topicConfig.Meta.Name) { - log.Infof("skipping generated config for topic %v", topicConfig.Meta.Name) - return nil - } - // topic config should be consistent with the cluster config if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil { log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) @@ -444,14 +443,3 @@ func processTopicFiles(topicFiles []string, operation func(topicConfig config.To } return nil } - -func topicConfigExists(topicFilepath string, existingFiles map[string]struct{}, name string) bool { - configPath, _ := filepath.Split(topicFilepath) - if strings.HasSuffix(configPath, "tmp/") { - _, found := existingFiles[name] - if found { - return true - } - } - return false -} From 481d8b7905b64f49516fc0b00e7c945af2753dbe Mon Sep 17 00:00:00 2001 From: David T Date: Tue, 2 Dec 2025 15:09:33 -0700 Subject: [PATCH 5/7] Update cmd/topicctl/subcmd/rebalance.go Co-authored-by: Peter Dannemann <28637185+petedannemann@users.noreply.github.com> --- cmd/topicctl/subcmd/rebalance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 5499ee1b..2ebdd761 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -201,7 +201,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { false, ) - // re-invetory config files to take into account newly generated ones + // re-inventory config files to take into account newly generated ones topicFiles, err = getAllFiles(topicConfigDir) if err != nil { return err From e630982013a373ff17693a5428a1742609eed2ee Mon Sep 17 00:00:00 2001 From: David Teutli Date: Thu, 4 Dec 2025 10:07:03 -0700 Subject: [PATCH 6/7] update readme, name of flag, and logic to skip existing files --- README.md | 2 ++ cmd/topicctl/subcmd/rebalance.go | 60 +++++++++++++------------------- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index b109d416..5e89b282 100644 --- a/README.md +++ b/README.md @@ -457,6 +457,8 @@ To rebalance **all** topics in a cluster, use the `rebalance` subcommand, which function on all qualifying topics. It will inventory all topic configs found at `--path-prefix` for a cluster specified by `--cluster-config`. +To rebalance topics in a cluster that's missing any topic configs, use the `rebalance` subcommand with the `--bootstrap-missing-configs` flag. This will temporarily bootstrap any missing topic configs at `--path-prefix`. + This subcommand will not rebalance a topic if: 1. the topic config is inconsistent with the cluster config (name, region, environment etc...) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 5499ee1b..c704d575 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -7,7 +7,6 @@ import ( "os/signal" "path/filepath" "strconv" - "strings" "syscall" "time" @@ -33,7 +32,7 @@ type rebalanceCmdConfig struct { brokersToRemove []int brokerThrottleMBsOverride int dryRun bool - generateConfig bool + bootstrapMissingConfigs bool partitionBatchSizeOverride int pathPrefix string sleepLoopDuration time.Duration @@ -88,10 +87,10 @@ func init() { "Interval of time to show progress during rebalance", ) rebalanceCmd.Flags().BoolVar( - &rebalanceConfig.generateConfig, - "generate-config", + &rebalanceConfig.bootstrapMissingConfigs, + "bootstrap-missing-configs", false, - "Generate temporary config file(s) for the rebalance of configless topic(s)", + "Bootstrap temporary topic config(s) for the rebalance of configless topic(s)", ) addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared) @@ -159,49 +158,32 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { return err } - tmpDir := "" - existingConfigFiles := []string{} - if rebalanceConfig.generateConfig { + existingConfigFiles := make(map[string]struct{}) + if rebalanceConfig.bootstrapMissingConfigs { // make set of existing files err := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error { - existingConfigFiles = append(existingConfigFiles, topicConfig.Meta.Name) + _, topicFilename := filepath.Split(topicFile) + existingConfigFiles[topicFilename] = struct{}{} return nil }) if err != nil { return err } - // create temp dir - tmpDir = rebalanceConfig.pathPrefix - if rebalanceConfig.pathPrefix[len(rebalanceConfig.pathPrefix) - 1] != '/' { - tmpDir += "/" - } - tmpDir += "tmp/" - err = os.Mkdir(tmpDir, 0755) - if err != nil { - return err - } - log.Infof("tmp output path: %v", tmpDir) - - topicsToExclude := ".^" - if len(existingConfigFiles) > 0 { - topicsToExclude = strings.Join(existingConfigFiles, "|") - } - - // generate missing config files + // bootstrap missing config files cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) cliRunner.BootstrapTopics( ctx, []string{}, clusterConfig, ".*", - topicsToExclude, - tmpDir, + ".^", + rebalanceConfig.pathPrefix, false, false, ) - // re-invetory config files to take into account newly generated ones + // re-inventory topic configs to take into account bootstrapped ones topicFiles, err = getAllFiles(topicConfigDir) if err != nil { return err @@ -281,11 +263,17 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } } - // clean up any generated config files - if rebalanceConfig.generateConfig { - err = os.RemoveAll(tmpDir) - if err != nil { - return err + // clean up any bootstrapped topic configs + if rebalanceConfig.bootstrapMissingConfigs { + for _, topicFile := range topicFiles { + _, topicFilename := filepath.Split(topicFile) + if _, found := existingConfigFiles[topicFilename]; found { + continue + } + err := os.Remove(topicFile) + if err != nil { + log.Errorf("error deleting temporary file %s: %v", topicFile, err) + } } } @@ -437,7 +425,7 @@ func processTopicFiles(topicFiles []string, operation func(topicConfig config.To for _, topicConfig := range topicConfigs { err := operation(topicConfig, topicFile) if err != nil { - return fmt.Errorf("error during operationg on config %d (%s): %w", 0, topicConfig.Meta.Name, err) + return fmt.Errorf("error during operation on config %d (%s): %w", 0, topicConfig.Meta.Name, err) } } } From d5dfccc27e83513b3c6b9da002b3b29dc3f23afd Mon Sep 17 00:00:00 2001 From: David T Date: Thu, 4 Dec 2025 10:34:54 -0700 Subject: [PATCH 7/7] Update README.md Co-authored-by: Peter Dannemann <28637185+petedannemann@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5e89b282..0dcd7731 100644 --- a/README.md +++ b/README.md @@ -457,7 +457,7 @@ To rebalance **all** topics in a cluster, use the `rebalance` subcommand, which function on all qualifying topics. It will inventory all topic configs found at `--path-prefix` for a cluster specified by `--cluster-config`. -To rebalance topics in a cluster that's missing any topic configs, use the `rebalance` subcommand with the `--bootstrap-missing-configs` flag. This will temporarily bootstrap any missing topic configs at `--path-prefix`. +To rebalance topics in a cluster that exist without topic configuration files, use the `rebalance` subcommand with the `--bootstrap-missing-configs` flag. This will temporarily bootstrap any missing topic configs at `--path-prefix`. This can also be used to use topicctl as a topic rebalancing tool, without using its topic configuration management features This subcommand will not rebalance a topic if: