Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/admin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ type zkClusterID struct {
}

type zkControllerInfo struct {
Version int `json:"version"`
BrokerID int `json:"brokerid"`
Timestamp string `json:"timestamp"`
Version int `json:"version"`
BrokerID int `json:"brokerid"`
Timestamp string `json:"timestamp"`
}

type zkBrokerInfo struct {
Expand Down
18 changes: 9 additions & 9 deletions pkg/apply/assigners/balancer_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
//
// The algorithm currently used is as follows:
//
// while not balanced:
// find racks with fewest and most leaders (i.e., the overrepresented and underrepresented)
// improve balance by doing a single leader replacement:
// use the picker to rank the partitions that have an overrepresented leader
// for each leader:
// for each partition with the leader:
// swap the leader with one of its followers if possible, then stop
// otherwise, use the picker to replace the leader in the top-ranked partition with
// a new broker from the target rack
// while not balanced:
// find racks with fewest and most leaders (i.e., the overrepresented and underrepresented)
// improve balance by doing a single leader replacement:
// use the picker to rank the partitions that have an overrepresented leader
// for each leader:
// for each partition with the leader:
// swap the leader with one of its followers if possible, then stop
// otherwise, use the picker to replace the leader in the top-ranked partition with
// a new broker from the target rack
type BalancedLeaderAssigner struct {
brokers []admin.BrokerInfo
racks []string
Expand Down
17 changes: 10 additions & 7 deletions pkg/apply/assigners/cross_rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package assigners

import (
"fmt"
"sort"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/apply/pickers"
"sort"
)

// CrossRackAssigner is an assigner that ensures that the replicas of each
Expand All @@ -13,16 +14,18 @@ import (
// https://segment.atlassian.net/browse/DRES-922?focusedCommentId=237288
//
// for each partition:
// for each non-leader replica:
// if replica is in same rack as leader:
// change replica to a placeholder (-1)
//
// for each non-leader replica:
// if replica is in same rack as leader:
// change replica to a placeholder (-1)
//
// then:
//
// for each partition:
// for each non-leader replica:
// if replica is set to placeholder:
// use picker to replace it with a broker in a different rack than the leader and any other replicas
//
// for each non-leader replica:
// if replica is set to placeholder:
// use picker to replace it with a broker in a different rack than the leader and any other replicas
//
// Note that this assigner doesn't make any leader changes. Thus, the assignments
// need to already be leader balanced before we make the changes with this assigner.
Expand Down
2 changes: 1 addition & 1 deletion pkg/apply/assigners/cross_rack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,4 @@ func TestCrossRackAssignerTwoReplicas(t *testing.T) {
for _, testCase := range testCases {
testCase.evaluate(t, assigner)
}
}
}
14 changes: 8 additions & 6 deletions pkg/apply/assigners/single_rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ import (
// partition are in the same rack as the leader. The algorithm is:
//
// for each partition:
// for each non-leader replica:
// if replica not in same rack as leader:
// change replica to a placeholder (-1)
//
// for each non-leader replica:
// if replica not in same rack as leader:
// change replica to a placeholder (-1)
//
// then:
//
// for each partition:
// for each non-leader replica:
// if replica is set to placeholder:
// use picker to replace it with a broker in the target rack
//
// for each non-leader replica:
// if replica is set to placeholder:
// use picker to replace it with a broker in the target rack
//
// Note that this assigner doesn't make any leader changes. Thus, the assignments
// need to already be leader balanced before we make the changes with this assigner.
Expand Down
14 changes: 8 additions & 6 deletions pkg/apply/assigners/static_single_rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ import (
// The following algorithm is used:
//
// for each partition:
// for each replica:
// if replica not in the desired (static) rack:
// change the replica to a placeholder (-1)
//
// for each replica:
// if replica not in the desired (static) rack:
// change the replica to a placeholder (-1)
//
// then:
//
// for each partition:
// for each replica:
// if replica set to the placeholder:
// use picker to pick a broker from the set of all brokers in the target rack
//
// for each replica:
// if replica set to the placeholder:
// use picker to pick a broker from the set of all brokers in the target rack
//
// In the case of ties, the lowest indexed broker is picked (if randomize is false) or
// a repeatably random choice (if randomize is true).
Expand Down
13 changes: 7 additions & 6 deletions pkg/apply/extenders/balanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
// algorithm is:
//
// for each new partition:
// set the leader rack to the next rack in the cycle
// choose the leader using the picker
// for each follower:
// set the rack to either the same one as the leader (if inRack true) or the next one in the
// cycle (if inRack false)
// pick the follower using the picker
//
// set the leader rack to the next rack in the cycle
// choose the leader using the picker
// for each follower:
// set the rack to either the same one as the leader (if inRack true) or the next one in the
// cycle (if inRack false)
// pick the follower using the picker
type BalancedExtender struct {
brokers []admin.BrokerInfo
inRack bool
Expand Down
28 changes: 14 additions & 14 deletions pkg/apply/rebalancers/frequency.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ import (
// FrequencyRebalancer is a Rebalancer that rebalances to achieve in-topic balance among
// all available brokers. The algorithm used is:
//
// for each replica position index:
// while true:
// get counts for each broker in that position
// partition the set of brokers into two sets, a "lower" one and an "upper one", based on
// on the sorted frequency counts (with brokers to be removed treated as the highest
// frequencies)
// for each lower broker, upper broker combination:
// try to replace the upper broker with the lower one
// if replacement made, continue to next while loop iteration
// if no replacement made, break out of while loop, continue to next partition index
// for each replica position index:
// while true:
// get counts for each broker in that position
// partition the set of brokers into two sets, a "lower" one and an "upper one", based on
// on the sorted frequency counts (with brokers to be removed treated as the highest
// frequencies)
// for each lower broker, upper broker combination:
// try to replace the upper broker with the lower one
// if replacement made, continue to next while loop iteration
// if no replacement made, break out of while loop, continue to next partition index
//
// Replacements are made if:
//
// 1. The replacement improves the broker balance for the index OR
// the replacement improves the broker balance for the topic as a whole
// 1. The replacement improves the broker balance for the index OR
// the replacement improves the broker balance for the topic as a whole
// AND
// 2. The replacement is consistent with the placement strategy for the topic (e.g., balanced
// leaders, in-rack, etc.)
// 2. The replacement is consistent with the placement strategy for the topic (e.g., balanced
// leaders, in-rack, etc.)
//
// The picker passed in to the rebalancer is used to sort the partitions for each broker (if it
// appears more than once for the current index) and also to break ties when sorting and
Expand Down
12 changes: 6 additions & 6 deletions pkg/groups/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func TestGetGroups(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestGetGroups(t *testing.T) {
func TestGetGroupsMultiMember(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestGetGroupsMultiMember(t *testing.T) {
func TestGetGroupsMultiMemberMultiTopic(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestGetGroupsMultiMemberMultiTopic(t *testing.T) {
func TestGetLags(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestGetLags(t *testing.T) {
func TestGetEarliestOrLatestOffset(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestGetEarliestOrLatestOffset(t *testing.T) {
func TestResetOffsets(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/messages/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestGetAllPartitionBounds(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/messages/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestTailerGetMessages(t *testing.T) {
defer cancel()

connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package util

import (
"context"
log "github.com/sirupsen/logrus"
"time"

log "github.com/sirupsen/logrus"
)

// Rebalance topic progress Config
Expand Down