Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/memcached/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var startCmd = &cobra.Command{
memcachedPath := os.Getenv("ARCUS_PATH")
command := fmt.Sprintf(memcachedStartCommandTemplate,
memcachedPath, memcachedPath, memcachedPath, memcachedPath, memcachedPath,
port, string(globalConfig), os.Getenv("ZK_ADDR"))
port, string(globalConfig), os.Getenv("ZK_LIST"))

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down
69 changes: 63 additions & 6 deletions cmd/zookeeper/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,78 @@ package zookeeper
import (
"fmt"
"os"
"path"
"strings"

"github.com/go-zookeeper/zk"
"github.com/jam2in/arcus-cli/internal"
"github.com/spf13/cobra"
)

var initCmd = &cobra.Command{
Use: "init",
Short: "Initialize the basic Arcus znode structure in Zookeeper.",
Short: "Initialize ZooKeeper config files (myid, zoo.cfg) on ensemble servers",
Run: func(cmd *cobra.Command, args []string) {
zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn)
err := internal.InitializeZK(zkConn)
if err != nil {
fmt.Fprintln(os.Stderr, err)
zkList := os.Getenv("ZK_LIST")
zkPath := os.Getenv("ZK_PATH")
if zkList == "" || zkPath == "" {
fmt.Fprintln(os.Stderr, "Environment variables are not provided.\nPlease set the ZK_LIST, ZK_PATH environment variables")
os.Exit(1)
}
zkServers := strings.Split(zkList, ",")

fmt.Printf("Initializing ZooKeeper config for %d server(s)...\n", len(zkServers))
for i, server := range zkServers {
ip, port, _ := strings.Cut(server, ":")
myid := i + 1

err := configZK(ip, port, zkPath, myid, zkServers)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
fmt.Println("ZooKeeper configuration complete. You can now start the ensemble using 'zookeeper start'.")
},
}

func configZK(ip, port, zkPath string, myid int, allServers []string) error {
dataDir := path.Join(zkPath, "data")
confDir := path.Join(zkPath, "conf")
confPath := path.Join(confDir, "zoo.cfg")
zooCfgContent := buildZooCfg(allServers, zkPath, port)
configZKCmd := fmt.Sprintf(zookeeperConfigTemplate, dataDir, myid, confDir, confPath, zooCfgContent)

client, err := internal.NewSSHClient(ip)
if err != nil {
return err
}
defer client.Close()

session, _ := client.NewSession()
defer session.Close()
if err = session.Run(configZKCmd); err != nil {
return fmt.Errorf("failed to configure zookeeper: %w", err)
}

return nil
}

func buildZooCfg(servers []string, zkPath, port string) string {
var zooCfg strings.Builder
zooCfg.WriteString("tickTime=2000\n")
zooCfg.WriteString("initLimit=10\n")
zooCfg.WriteString("syncLimit=5\n")
zooCfg.WriteString(fmt.Sprintf("dataDir=%s/data\n", zkPath))
zooCfg.WriteString(fmt.Sprintf("clientPort=%s\n", port))
zooCfg.WriteString("standaloneEnabled=false\n")
zooCfg.WriteString("reconfigEnabled=true\n")
zooCfg.WriteString("4lw.commands.whitelist=*\n\n")
zooCfg.WriteString("# Server Lists\n")

for i, server := range servers {
ip := strings.Split(server, ":")[0]
zooCfg.WriteString(fmt.Sprintf("server.%d=%s:2888:3888\n", i+1, ip))
}

return zooCfg.String()
}
30 changes: 12 additions & 18 deletions cmd/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package zookeeper

import (
"github.com/go-zookeeper/zk"
"github.com/jam2in/arcus-cli/internal"
"github.com/spf13/cobra"
)

const (
zookeeperConfigTemplate = `mkdir -p %[1]s && echo %[2]d > %[1]s/myid \
mkdir -p %[3]s && \
mv %[4]s %[4]s.bak$(date +%%s) 2>/dev/null || true && \
cat << 'EOF' > %[4]s
%[5]s
EOF`
)

var ZookeeperCmd = &cobra.Command{
Use: "zookeeper",
Short: "A CLI tool for zookeeper commands",
Long: "A command-line interface to manage the ZooKeeper structure for an Arcus cluster.\n" +
"This includes initializing the required znode directory layout. that Arcus\n" +
"and controlling the lifecycle of the Zookeeper cluster.",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
ctx, err := internal.ContextWithZkConn(cmd.Context(), "", "")
if err != nil {
return err
}
cmd.SetContext(ctx)

return nil
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn)
zkConn.Close()
},
Long: "A command-line interface to manage the ZooKeeper ensemble for an Arcus cluster.\n" +
"This includes remotely initializing server configuration files (myid, zoo.cfg)\n" +
"and controlling the lifecycle(start, stop, stat) of the Zookeeper processes.",
}

func init() {
Expand Down
2 changes: 1 addition & 1 deletion internal/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var arcusBasicPaths = []string{
}

func ContextWithZkConn(ctx context.Context, user, password string) (context.Context, error) {
addr := os.Getenv("ZK_ADDR")
addr := os.Getenv("ZK_LIST")
if addr == "" {
return nil, fmt.Errorf("ZooKeeper address is not set")
}
Expand Down