From d4efbf463a0908698eb84922eb1061a442ec2c74 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 1 May 2025 11:51:33 -0400 Subject: [PATCH 1/6] fix: don't error on static assignment if rebalancing should take place --- pkg/apply/assigners/evaluate.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/apply/assigners/evaluate.go b/pkg/apply/assigners/evaluate.go index 0881ee42..37d8adca 100644 --- a/pkg/apply/assigners/evaluate.go +++ b/pkg/apply/assigners/evaluate.go @@ -31,10 +31,15 @@ func EvaluateAssignments( if err != nil { return false, err } - return reflect.DeepEqual( + if !reflect.DeepEqual( replicas, placementConfig.StaticAssignments, - ), nil + ) { + log.Info( + "Current assignment does not match static assignment, rebalancing will take place", + ) + } + return true, nil case config.PlacementStrategyStaticInRack: if !(minRacks == 1 && maxRacks == 1) { return false, nil From 3c081f15b81238289673e5487de0a85d361461dd Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 1 May 2025 13:36:20 -0400 Subject: [PATCH 2/6] update evaluate tests --- pkg/apply/assigners/evaluate_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/apply/assigners/evaluate_test.go b/pkg/apply/assigners/evaluate_test.go index 35091210..4f1e802e 100644 --- a/pkg/apply/assigners/evaluate_test.go +++ b/pkg/apply/assigners/evaluate_test.go @@ -26,7 +26,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: false, + config.PlacementStrategyStatic: true, config.PlacementStrategyStaticInRack: false, config.PlacementStrategyBalancedLeaders: false, config.PlacementStrategyInRack: false, @@ -72,7 +72,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: false, + config.PlacementStrategyStatic: true, config.PlacementStrategyStaticInRack: true, config.PlacementStrategyBalancedLeaders: false, config.PlacementStrategyInRack: true, @@ -86,7 +86,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: false, + config.PlacementStrategyStatic: true, config.PlacementStrategyStaticInRack: false, config.PlacementStrategyBalancedLeaders: false, config.PlacementStrategyInRack: false, @@ -103,7 +103,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: false, + config.PlacementStrategyStatic: true, config.PlacementStrategyStaticInRack: false, config.PlacementStrategyBalancedLeaders: true, config.PlacementStrategyInRack: true, @@ -116,7 +116,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: false, + config.PlacementStrategyStatic: true, config.PlacementStrategyStaticInRack: false, config.PlacementStrategyBalancedLeaders: false, config.PlacementStrategyInRack: true, From 4020cb9aa6059de0350edf8f2f61db8a2378f972 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 1 May 2025 14:12:16 -0400 Subject: [PATCH 3/6] more logging --- pkg/apply/apply.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index ff6104fc..df668717 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -983,6 +983,10 @@ func (t *TopicApplier) updatePlacementHelper( case config.PlacementStrategyCrossRack: assigner = assigners.NewCrossRackAssigner(t.brokers, picker) case config.PlacementStrategyStatic: + log.Infof("Using static assignment") + log.Infof("Desired assignments: %v", admin.ReplicasToAssignments( + t.topicConfig.Spec.PlacementConfig.StaticAssignments, + )) assigner = &assigners.StaticAssigner{ Assignments: admin.ReplicasToAssignments( t.topicConfig.Spec.PlacementConfig.StaticAssignments, From 56f664169bcb402c1e221b6189adf7400d7f42e9 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 1 May 2025 14:55:05 -0400 Subject: [PATCH 4/6] handle static errors in frequecy.go instead of evaluate.go --- pkg/apply/assigners/evaluate.go | 9 ++------- pkg/apply/rebalancers/frequency.go | 9 ++++++++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/apply/assigners/evaluate.go b/pkg/apply/assigners/evaluate.go index 37d8adca..0881ee42 100644 --- a/pkg/apply/assigners/evaluate.go +++ b/pkg/apply/assigners/evaluate.go @@ -31,15 +31,10 @@ func EvaluateAssignments( if err != nil { return false, err } - if !reflect.DeepEqual( + return reflect.DeepEqual( replicas, placementConfig.StaticAssignments, - ) { - log.Info( - "Current assignment does not match static assignment, rebalancing will take place", - ) - } - return true, nil + ), nil case config.PlacementStrategyStaticInRack: if !(minRacks == 1 && maxRacks == 1) { return false, nil diff --git a/pkg/apply/rebalancers/frequency.go b/pkg/apply/rebalancers/frequency.go index bda4e91b..27f5b025 100644 --- a/pkg/apply/rebalancers/frequency.go +++ b/pkg/apply/rebalancers/frequency.go @@ -8,6 +8,7 @@ import ( "github.com/segmentio/topicctl/pkg/apply/assigners" "github.com/segmentio/topicctl/pkg/apply/pickers" "github.com/segmentio/topicctl/pkg/config" + log "github.com/sirupsen/logrus" ) // FrequencyRebalancer is a Rebalancer that rebalances to achieve in-topic balance among @@ -67,7 +68,13 @@ func (f *FrequencyRebalancer) Rebalance( if err != nil { return nil, err } else if !ok { - return nil, fmt.Errorf("starting assignments on topic %s do not satisfy placement config - assignments: %#v", topic, curr) + // If we're doing a static rebalance, we don't expect the existing assignments to + // satify the placement config + if f.placementConfig.Strategy == config.PlacementStrategyStatic { + log.Info("Current partition assignment does not match static assignment, rebalancing will take place") + } else { + return nil, fmt.Errorf("starting assignments on topic %s do not satisfy placement config - assignments: %#v", topic, curr) + } } desired := admin.CopyAssignments(curr) From 496650dda29a66e998f997294a4d2391b43dcc47 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 1 May 2025 14:58:58 -0400 Subject: [PATCH 5/6] Revert "update evaluate tests" This reverts commit 3c081f15b81238289673e5487de0a85d361461dd. --- pkg/apply/assigners/evaluate_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/apply/assigners/evaluate_test.go b/pkg/apply/assigners/evaluate_test.go index 4f1e802e..35091210 100644 --- a/pkg/apply/assigners/evaluate_test.go +++ b/pkg/apply/assigners/evaluate_test.go @@ -26,7 +26,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: true, + config.PlacementStrategyStatic: false, config.PlacementStrategyStaticInRack: false, config.PlacementStrategyBalancedLeaders: false, config.PlacementStrategyInRack: false, @@ -72,7 +72,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: true, + config.PlacementStrategyStatic: false, config.PlacementStrategyStaticInRack: true, config.PlacementStrategyBalancedLeaders: false, config.PlacementStrategyInRack: true, @@ -86,7 +86,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: true, + config.PlacementStrategyStatic: false, config.PlacementStrategyStaticInRack: false, config.PlacementStrategyBalancedLeaders: false, config.PlacementStrategyInRack: false, @@ -103,7 +103,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: true, + config.PlacementStrategyStatic: false, config.PlacementStrategyStaticInRack: false, config.PlacementStrategyBalancedLeaders: true, config.PlacementStrategyInRack: true, @@ -116,7 +116,7 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, expectedResults: map[config.PlacementStrategy]bool{ config.PlacementStrategyAny: true, - config.PlacementStrategyStatic: true, + config.PlacementStrategyStatic: false, config.PlacementStrategyStaticInRack: false, config.PlacementStrategyBalancedLeaders: false, config.PlacementStrategyInRack: true, From 83194e0e1a8967715434b9b80c46413a9d8fdb62 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 1 May 2025 15:17:54 -0400 Subject: [PATCH 6/6] Revert "more logging" This reverts commit 4020cb9aa6059de0350edf8f2f61db8a2378f972. --- pkg/apply/apply.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index df668717..ff6104fc 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -983,10 +983,6 @@ func (t *TopicApplier) updatePlacementHelper( case config.PlacementStrategyCrossRack: assigner = assigners.NewCrossRackAssigner(t.brokers, picker) case config.PlacementStrategyStatic: - log.Infof("Using static assignment") - log.Infof("Desired assignments: %v", admin.ReplicasToAssignments( - t.topicConfig.Spec.PlacementConfig.StaticAssignments, - )) assigner = &assigners.StaticAssigner{ Assignments: admin.ReplicasToAssignments( t.topicConfig.Spec.PlacementConfig.StaticAssignments,