diff --git a/pkg/admin/types.go b/pkg/admin/types.go index 88ed16d7..dc393a24 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -357,9 +357,9 @@ type zkClusterID struct { } type zkControllerInfo struct { - Version int `json:"version"` - BrokerID int `json:"brokerid"` - Timestamp string `json:"timestamp"` + Version int `json:"version"` + BrokerID int `json:"brokerid"` + Timestamp string `json:"timestamp"` } type zkBrokerInfo struct { diff --git a/pkg/apply/assigners/balancer_leader.go b/pkg/apply/assigners/balancer_leader.go index e026a6e7..58cfd6e0 100644 --- a/pkg/apply/assigners/balancer_leader.go +++ b/pkg/apply/assigners/balancer_leader.go @@ -18,15 +18,15 @@ import ( // // The algorithm currently used is as follows: // -// while not balanced: -// find racks with fewest and most leaders (i.e., the overrepresented and underrepresented) -// improve balance by doing a single leader replacement: -// use the picker to rank the partitions that have an overrepresented leader -// for each leader: -// for each partition with the leader: -// swap the leader with one of its followers if possible, then stop -// otherwise, use the picker to replace the leader in the top-ranked partition with -// a new broker from the target rack +// while not balanced: +// find racks with fewest and most leaders (i.e., the overrepresented and underrepresented) +// improve balance by doing a single leader replacement: +// use the picker to rank the partitions that have an overrepresented leader +// for each leader: +// for each partition with the leader: +// swap the leader with one of its followers if possible, then stop +// otherwise, use the picker to replace the leader in the top-ranked partition with +// a new broker from the target rack type BalancedLeaderAssigner struct { brokers []admin.BrokerInfo racks []string diff --git a/pkg/apply/assigners/cross_rack.go b/pkg/apply/assigners/cross_rack.go index 73066350..5dbe8b9a 100644 --- a/pkg/apply/assigners/cross_rack.go +++ b/pkg/apply/assigners/cross_rack.go @@ -2,9 +2,10 @@ package assigners import ( "fmt" + "sort" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply/pickers" - "sort" ) // CrossRackAssigner is an assigner that ensures that the replicas of each @@ -13,16 +14,18 @@ import ( // https://segment.atlassian.net/browse/DRES-922?focusedCommentId=237288 // // for each partition: -// for each non-leader replica: -// if replica is in same rack as leader: -// change replica to a placeholder (-1) +// +// for each non-leader replica: +// if replica is in same rack as leader: +// change replica to a placeholder (-1) // // then: // // for each partition: -// for each non-leader replica: -// if replica is set to placeholder: -// use picker to replace it with a broker in a different rack than the leader and any other replicas +// +// for each non-leader replica: +// if replica is set to placeholder: +// use picker to replace it with a broker in a different rack than the leader and any other replicas // // Note that this assigner doesn't make any leader changes. Thus, the assignments // need to already be leader balanced before we make the changes with this assigner. diff --git a/pkg/apply/assigners/cross_rack_test.go b/pkg/apply/assigners/cross_rack_test.go index f4c65865..f3eb6227 100644 --- a/pkg/apply/assigners/cross_rack_test.go +++ b/pkg/apply/assigners/cross_rack_test.go @@ -245,4 +245,4 @@ func TestCrossRackAssignerTwoReplicas(t *testing.T) { for _, testCase := range testCases { testCase.evaluate(t, assigner) } -} \ No newline at end of file +} diff --git a/pkg/apply/assigners/single_rack.go b/pkg/apply/assigners/single_rack.go index 917db950..1da6a775 100644 --- a/pkg/apply/assigners/single_rack.go +++ b/pkg/apply/assigners/single_rack.go @@ -11,16 +11,18 @@ import ( // partition are in the same rack as the leader. The algorithm is: // // for each partition: -// for each non-leader replica: -// if replica not in same rack as leader: -// change replica to a placeholder (-1) +// +// for each non-leader replica: +// if replica not in same rack as leader: +// change replica to a placeholder (-1) // // then: // // for each partition: -// for each non-leader replica: -// if replica is set to placeholder: -// use picker to replace it with a broker in the target rack +// +// for each non-leader replica: +// if replica is set to placeholder: +// use picker to replace it with a broker in the target rack // // Note that this assigner doesn't make any leader changes. Thus, the assignments // need to already be leader balanced before we make the changes with this assigner. diff --git a/pkg/apply/assigners/static_single_rack.go b/pkg/apply/assigners/static_single_rack.go index 1b1ddc99..d506dc22 100644 --- a/pkg/apply/assigners/static_single_rack.go +++ b/pkg/apply/assigners/static_single_rack.go @@ -14,16 +14,18 @@ import ( // The following algorithm is used: // // for each partition: -// for each replica: -// if replica not in the desired (static) rack: -// change the replica to a placeholder (-1) +// +// for each replica: +// if replica not in the desired (static) rack: +// change the replica to a placeholder (-1) // // then: // // for each partition: -// for each replica: -// if replica set to the placeholder: -// use picker to pick a broker from the set of all brokers in the target rack +// +// for each replica: +// if replica set to the placeholder: +// use picker to pick a broker from the set of all brokers in the target rack // // In the case of ties, the lowest indexed broker is picked (if randomize is false) or // a repeatably random choice (if randomize is true). diff --git a/pkg/apply/extenders/balanced.go b/pkg/apply/extenders/balanced.go index e94641d6..6b4c5963 100644 --- a/pkg/apply/extenders/balanced.go +++ b/pkg/apply/extenders/balanced.go @@ -12,12 +12,13 @@ import ( // algorithm is: // // for each new partition: -// set the leader rack to the next rack in the cycle -// choose the leader using the picker -// for each follower: -// set the rack to either the same one as the leader (if inRack true) or the next one in the -// cycle (if inRack false) -// pick the follower using the picker +// +// set the leader rack to the next rack in the cycle +// choose the leader using the picker +// for each follower: +// set the rack to either the same one as the leader (if inRack true) or the next one in the +// cycle (if inRack false) +// pick the follower using the picker type BalancedExtender struct { brokers []admin.BrokerInfo inRack bool diff --git a/pkg/apply/rebalancers/frequency.go b/pkg/apply/rebalancers/frequency.go index 142e6501..3f9ff089 100644 --- a/pkg/apply/rebalancers/frequency.go +++ b/pkg/apply/rebalancers/frequency.go @@ -13,24 +13,24 @@ import ( // FrequencyRebalancer is a Rebalancer that rebalances to achieve in-topic balance among // all available brokers. The algorithm used is: // -// for each replica position index: -// while true: -// get counts for each broker in that position -// partition the set of brokers into two sets, a "lower" one and an "upper one", based on -// on the sorted frequency counts (with brokers to be removed treated as the highest -// frequencies) -// for each lower broker, upper broker combination: -// try to replace the upper broker with the lower one -// if replacement made, continue to next while loop iteration -// if no replacement made, break out of while loop, continue to next partition index +// for each replica position index: +// while true: +// get counts for each broker in that position +// partition the set of brokers into two sets, a "lower" one and an "upper one", based on +// on the sorted frequency counts (with brokers to be removed treated as the highest +// frequencies) +// for each lower broker, upper broker combination: +// try to replace the upper broker with the lower one +// if replacement made, continue to next while loop iteration +// if no replacement made, break out of while loop, continue to next partition index // // Replacements are made if: // -// 1. The replacement improves the broker balance for the index OR -// the replacement improves the broker balance for the topic as a whole +// 1. The replacement improves the broker balance for the index OR +// the replacement improves the broker balance for the topic as a whole // AND -// 2. The replacement is consistent with the placement strategy for the topic (e.g., balanced -// leaders, in-rack, etc.) +// 2. The replacement is consistent with the placement strategy for the topic (e.g., balanced +// leaders, in-rack, etc.) // // The picker passed in to the rebalancer is used to sort the partitions for each broker (if it // appears more than once for the current index) and also to break ties when sorting and diff --git a/pkg/groups/groups_test.go b/pkg/groups/groups_test.go index 71288f7b..1ce3a381 100644 --- a/pkg/groups/groups_test.go +++ b/pkg/groups/groups_test.go @@ -17,7 +17,7 @@ import ( func TestGetGroups(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ - BrokerAddr: util.TestKafkaAddr(), + BrokerAddr: util.TestKafkaAddr(), ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -83,7 +83,7 @@ func TestGetGroups(t *testing.T) { func TestGetGroupsMultiMember(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ - BrokerAddr: util.TestKafkaAddr(), + BrokerAddr: util.TestKafkaAddr(), ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -165,7 +165,7 @@ func TestGetGroupsMultiMember(t *testing.T) { func TestGetGroupsMultiMemberMultiTopic(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ - BrokerAddr: util.TestKafkaAddr(), + BrokerAddr: util.TestKafkaAddr(), ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -262,7 +262,7 @@ func TestGetGroupsMultiMemberMultiTopic(t *testing.T) { func TestGetLags(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ - BrokerAddr: util.TestKafkaAddr(), + BrokerAddr: util.TestKafkaAddr(), ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -306,7 +306,7 @@ func TestGetLags(t *testing.T) { func TestGetEarliestOrLatestOffset(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ - BrokerAddr: util.TestKafkaAddr(), + BrokerAddr: util.TestKafkaAddr(), ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -354,7 +354,7 @@ func TestGetEarliestOrLatestOffset(t *testing.T) { func TestResetOffsets(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ - BrokerAddr: util.TestKafkaAddr(), + BrokerAddr: util.TestKafkaAddr(), ConnTimeout: 10 * time.Second, }) require.NoError(t, err) diff --git a/pkg/messages/bounds_test.go b/pkg/messages/bounds_test.go index 991a3a81..bbc189b3 100644 --- a/pkg/messages/bounds_test.go +++ b/pkg/messages/bounds_test.go @@ -16,7 +16,7 @@ import ( func TestGetAllPartitionBounds(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ - BrokerAddr: util.TestKafkaAddr(), + BrokerAddr: util.TestKafkaAddr(), ConnTimeout: 10 * time.Second, }) require.NoError(t, err) diff --git a/pkg/messages/tail_test.go b/pkg/messages/tail_test.go index 92867c88..8463dbae 100644 --- a/pkg/messages/tail_test.go +++ b/pkg/messages/tail_test.go @@ -19,7 +19,7 @@ func TestTailerGetMessages(t *testing.T) { defer cancel() connector, err := admin.NewConnector(admin.ConnectorConfig{ - BrokerAddr: util.TestKafkaAddr(), + BrokerAddr: util.TestKafkaAddr(), ConnTimeout: 10 * time.Second, }) require.NoError(t, err) diff --git a/pkg/util/progress.go b/pkg/util/progress.go index ee559fc2..0c888a9b 100644 --- a/pkg/util/progress.go +++ b/pkg/util/progress.go @@ -2,8 +2,9 @@ package util import ( "context" - log "github.com/sirupsen/logrus" "time" + + log "github.com/sirupsen/logrus" ) // Rebalance topic progress Config