From cb04c8b5314984a3c9c5fc4ce2c6d0249333e5fd Mon Sep 17 00:00:00 2001 From: singsangssong Date: Wed, 1 Oct 2025 17:54:47 +0900 Subject: [PATCH] REFACTOR: consolidate core logic into the 'internal' package and update callers --- cmd/acl/acl.go | 7 +- cmd/acl/group/add.go | 12 +- cmd/acl/group/list.go | 7 +- cmd/acl/group/remove.go | 8 +- cmd/acl/user/add.go | 41 ++---- cmd/acl/user/list.go | 31 +--- cmd/acl/user/remove.go | 18 +-- cmd/acl/user/user.go | 13 -- cmd/memcached/add.go | 58 +------- cmd/memcached/config.go | 18 +-- cmd/memcached/list.go | 95 ++---------- cmd/memcached/memcached.go | 54 +------ cmd/memcached/remove.go | 68 +-------- cmd/memcached/start.go | 32 +---- cmd/memcached/stop.go | 26 +--- cmd/zookeeper/init.go | 7 +- cmd/zookeeper/zookeeper.go | 7 +- config/config.go | 19 +++ internal/acl/acl.go | 91 ++++++++++++ internal/memcached/memcached.go | 246 ++++++++++++++++++++++++++++++++ internal/{ => ssh}/ssh.go | 18 ++- internal/types/types.go | 48 +++++++ internal/zookeeper.go | 91 ------------ internal/zookeeper/zookeeper.go | 140 ++++++++++++++++++ 24 files changed, 651 insertions(+), 504 deletions(-) create mode 100644 config/config.go create mode 100644 internal/acl/acl.go create mode 100644 internal/memcached/memcached.go rename internal/{ => ssh}/ssh.go (69%) create mode 100644 internal/types/types.go delete mode 100644 internal/zookeeper.go create mode 100644 internal/zookeeper/zookeeper.go diff --git a/cmd/acl/acl.go b/cmd/acl/acl.go index f2a875e..af42287 100644 --- a/cmd/acl/acl.go +++ b/cmd/acl/acl.go @@ -4,7 +4,8 @@ import ( "github.com/go-zookeeper/zk" "github.com/jam2in/arcus-cli/cmd/acl/group" "github.com/jam2in/arcus-cli/cmd/acl/user" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/types" + "github.com/jam2in/arcus-cli/internal/zookeeper" "github.com/spf13/cobra" ) @@ -21,7 +22,7 @@ var AclCmd = &cobra.Command{ "allowing you to manage user groups and individual user credentials for SASL authentication.\n" + "A typical workflow involves creating a group first and then adding users to it.\n", PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - ctx, err := internal.ContextWithZkConn(cmd.Context(), digestUsername, digestPassword) + ctx, err := zookeeper.ContextWithZkConn(cmd.Context(), digestUsername, digestPassword) if err != nil { return err } @@ -30,7 +31,7 @@ var AclCmd = &cobra.Command{ return nil }, PersistentPostRun: func(cmd *cobra.Command, args []string) { - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) zkConn.Close() }, } diff --git a/cmd/acl/group/add.go b/cmd/acl/group/add.go index 81061dc..bef7033 100644 --- a/cmd/acl/group/add.go +++ b/cmd/acl/group/add.go @@ -5,7 +5,8 @@ import ( "os" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/acl" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -15,16 +16,13 @@ var addCmd = &cobra.Command{ Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { groupName := args[0] + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) + zkAcl := cmd.Context().Value(types.CtxZkAclKey{}).([]zk.ACL) - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) - acl := cmd.Context().Value(internal.CtxZkAclKey{}).([]zk.ACL) - - _, err := zkConn.Create(internal.AclRootPath+"/"+groupName, nil, 0, acl) - if err != nil { + if err := acl.AddGroup(zkConn, zkAcl, groupName); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - fmt.Printf("ACL group '%s' created successfully.\n", groupName) }, } diff --git a/cmd/acl/group/list.go b/cmd/acl/group/list.go index 28e1157..5c08055 100644 --- a/cmd/acl/group/list.go +++ b/cmd/acl/group/list.go @@ -5,7 +5,8 @@ import ( "os" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/acl" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -14,9 +15,9 @@ var listCmd = &cobra.Command{ Short: "List all ACL groups.", Args: cobra.NoArgs, Run: func(cmd *cobra.Command, args []string) { - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - groups, _, err := zkConn.Children(internal.AclRootPath) + groups, err := acl.GetGroups(zkConn) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) diff --git a/cmd/acl/group/remove.go b/cmd/acl/group/remove.go index 4a4eba0..d60c087 100644 --- a/cmd/acl/group/remove.go +++ b/cmd/acl/group/remove.go @@ -5,7 +5,8 @@ import ( "os" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/acl" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -15,10 +16,9 @@ var removeCmd = &cobra.Command{ Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { groupName := args[0] + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) - - if err := zkConn.Delete(internal.AclRootPath+"/"+groupName, -1); err != nil { + if err := acl.RemoveGroup(zkConn, groupName); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } diff --git a/cmd/acl/user/add.go b/cmd/acl/user/add.go index 3f898c3..b9d552a 100644 --- a/cmd/acl/user/add.go +++ b/cmd/acl/user/add.go @@ -8,8 +8,8 @@ import ( "syscall" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" - "github.com/jam2in/arcus-cli/internal/scram" + "github.com/jam2in/arcus-cli/internal/acl" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" "golang.org/x/term" ) @@ -33,24 +33,17 @@ var addCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { groupName := args[0] userArgs := args[1:] + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) + zkAcl := cmd.Context().Value(types.CtxZkAclKey{}).([]zk.ACL) - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) - acl := cmd.Context().Value(internal.CtxZkAclKey{}).([]zk.ACL) - - requests := make([]any, 0, 2*len(userArgs)) var err error for _, arg := range userArgs { - requests, err = appendRequests(requests, groupName, arg, acl) + err = addUserRequest(zkConn, zkAcl, groupName, arg) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } } - - if _, err := zkConn.Multi(requests...); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } }, } @@ -93,7 +86,7 @@ func validateRoles(role string) error { return nil } -func appendRequests(requests []any, group, arg string, acl []zk.ACL) ([]any, error) { +func addUserRequest(zkConn *zk.Conn, zkAcl []zk.ACL, group, arg string) error { tokens := strings.Split(arg, ":") var user, password, role string switch len(tokens) { @@ -106,28 +99,14 @@ func appendRequests(requests []any, group, arg string, acl []zk.ACL) ([]any, err password = tokens[1] role = tokens[2] default: - return nil, fmt.Errorf("invalid argument format: %s", arg) + return fmt.Errorf("invalid argument format: %s", arg) } if user == "" || password == "" { - return nil, fmt.Errorf("user & password cannot be empty: %s", arg) + return fmt.Errorf("user & password cannot be empty: %s", arg) } else if err := validateRoles(role); err != nil { - return nil, err + return err } - secret := scram.GenerateScramSHA256Secret(password, nil, 0) - return append(requests, - &zk.CreateRequest{ - Path: internal.AclRootPath + "/" + group + "/" + user, - Data: []byte(role), - Acl: acl, - Flags: 0, - }, - &zk.CreateRequest{ - Path: internal.AclRootPath + "/" + group + "/" + user + "/" + propName, - Data: []byte(secret.EncodeToBase64()), - Acl: acl, - Flags: 0, - }, - ), nil + return acl.AddUser(zkConn, zkAcl, group, user, password, role) } diff --git a/cmd/acl/user/list.go b/cmd/acl/user/list.go index b2a8c32..6e6260e 100644 --- a/cmd/acl/user/list.go +++ b/cmd/acl/user/list.go @@ -3,11 +3,10 @@ package user import ( "fmt" "os" - "path" - "strings" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/acl" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -17,10 +16,9 @@ var listCmd = &cobra.Command{ Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { groupName := args[0] + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) - - users, err := listUsers(zkConn, groupName) + users, err := acl.GetUsers(zkConn, groupName) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -33,24 +31,3 @@ var listCmd = &cobra.Command{ fmt.Printf("Total %d users in group '%s'.\n", len(users), groupName) }, } - -func listUsers(zkConn *zk.Conn, groupName string) ([]UserInfo, error) { - groupPath := internal.AclRootPath + "/" + groupName - - userNames, _, err := zkConn.Children(groupPath) - if err != nil { - return nil, err - } - - users := make([]UserInfo, 0, len(userNames)) - for _, userName := range userNames { - userPath := path.Join(groupPath, userName) - roleBytes, _, err := zkConn.Get(userPath) - if err != nil { - return nil, err - } - users = append(users, UserInfo{Username: userName, Roles: strings.Split(string(roleBytes), ",")}) - } - - return users, nil -} diff --git a/cmd/acl/user/remove.go b/cmd/acl/user/remove.go index d796ddf..b732ffe 100644 --- a/cmd/acl/user/remove.go +++ b/cmd/acl/user/remove.go @@ -5,7 +5,8 @@ import ( "os" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/acl" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -16,23 +17,12 @@ var removeCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { groupName := args[0] userName := args[1] + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) - - if _, err := zkConn.Multi( - &zk.DeleteRequest{ - Path: internal.AclRootPath + "/" + groupName + "/" + userName + "/" + propName, - Version: -1, - }, - &zk.DeleteRequest{ - Path: internal.AclRootPath + "/" + groupName + "/" + userName, - Version: -1, - }, - ); err != nil { + if err := acl.RemoveUser(zkConn, groupName, userName); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - fmt.Printf("User '%s' removed from group '%s' successfully.\n", userName, groupName) }, } diff --git a/cmd/acl/user/user.go b/cmd/acl/user/user.go index 48e3547..e25b20c 100644 --- a/cmd/acl/user/user.go +++ b/cmd/acl/user/user.go @@ -1,22 +1,9 @@ package user import ( - "fmt" - "github.com/spf13/cobra" ) -const propName = "authPassword" - -type UserInfo struct { - Username string - Roles []string -} - -func (i UserInfo) String() string { - return fmt.Sprintf("Username: %s, Role: %s", i.Username, i.Roles) -} - var UserCmd = &cobra.Command{ Use: "user", Short: "Manage SASL users within an ACL group", diff --git a/cmd/memcached/add.go b/cmd/memcached/add.go index e068511..c6961d1 100644 --- a/cmd/memcached/add.go +++ b/cmd/memcached/add.go @@ -3,10 +3,10 @@ package memcached import ( "fmt" "os" - "path" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/memcached" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -17,62 +17,12 @@ var addCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { serviceCode := args[0] address := args[1] + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) - - if err := addServiceCodePath(zkConn, serviceCode); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - - if err := addServerPath(zkConn, serviceCode, address); err != nil { + if err := memcached.AddToServiceCode(zkConn, serviceCode, address); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - fmt.Printf("Successfully added server %s to service code %s\n", address, serviceCode) }, } - -func addServiceCodePath(zkConn *zk.Conn, serviceCode string) error { - ops := []any{ - &zk.CreateRequest{ - Path: path.Join(internal.ArcusCacheListPath, serviceCode), - Data: nil, - Acl: zk.WorldACL(zk.PermAll), - Flags: 0, - }, - &zk.CreateRequest{ - Path: path.Join(internal.ArcusClientListPath, serviceCode), - Data: nil, - Acl: zk.WorldACL(zk.PermAll), - Flags: 0, - }, - } - if _, err := zkConn.Multi(ops...); err != nil && err != zk.ErrNodeExists { - return err - } - return nil -} - -func addServerPath(zkConn *zk.Conn, serviceCode, address string) error { - ops := []any{ - &zk.CreateRequest{ - Path: path.Join(internal.ArcusCacheServerMappingPath, address), - Data: nil, - Acl: zk.WorldACL(zk.PermAll), - Flags: 0, - }, - &zk.CreateRequest{ - Path: path.Join(internal.ArcusCacheServerMappingPath, address, serviceCode), - Data: nil, - Acl: zk.WorldACL(zk.PermAll), - Flags: 0, - }, - } - if _, err := zkConn.Multi(ops...); err != nil { - return err - } - - return nil -} diff --git a/cmd/memcached/config.go b/cmd/memcached/config.go index c34fad6..d739bd5 100644 --- a/cmd/memcached/config.go +++ b/cmd/memcached/config.go @@ -3,11 +3,11 @@ package memcached import ( "fmt" "os" - "path" "strings" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/memcached" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -17,21 +17,13 @@ var configCmd = &cobra.Command{ Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { serviceCode := args[0] - newData := []byte(strings.Join(args[1:], " ")) - cacheListPath := path.Join(internal.ArcusCacheListPath, serviceCode) + options := strings.Join(args[1:], " ") + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) - exists, _, err := zkConn.Exists(cacheListPath) - if err != nil { + if err := memcached.SetClusterConfig(zkConn, serviceCode, options); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - if exists { - if _, err := zkConn.Set(cacheListPath, newData, -1); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - } fmt.Printf("Global config for service '%s' has been updated\n", serviceCode) }, } diff --git a/cmd/memcached/list.go b/cmd/memcached/list.go index 8086662..2dc3062 100644 --- a/cmd/memcached/list.go +++ b/cmd/memcached/list.go @@ -3,107 +3,42 @@ package memcached import ( "fmt" "os" - "path" - "sort" "strings" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/memcached" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) -type serviceStatus struct { - serviceCode string - total int - online int - offline int -} - var listCmd = &cobra.Command{ Use: "list [serviceCode]", Short: "list all servers in arcus cache cloud", Run: func(cmd *cobra.Command, args []string) { + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) + if len(args) > 0 { + serviceCode := args[0] + status, err := memcached.GetServiceCodeStatus(zkConn, serviceCode) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + fmt.Println(status.String()) + return + } - serviceCodeMap, err := buildServiceCodeMap(zkConn) + statuses, err := memcached.GetAllServiceCodeStatus(zkConn) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - var serviceCodes []string - if len(args) == 0 { - for sm := range serviceCodeMap { - serviceCodes = append(serviceCodes, sm) - } - sort.Strings(serviceCodes) - } else { - serviceCodes = []string{args[0]} - } - - liveServerMaps := make(map[string]map[string]struct{}) - var statuses []serviceStatus - for _, sc := range serviceCodes { - liveServers, _ := getLiveServers(zkConn, sc) - liveServerMaps[sc] = liveServers - - onlineCnt := 0 - for _, s := range serviceCodeMap[sc] { - if _, ok := liveServers[s]; ok { - onlineCnt++ - } - } - - statuses = append(statuses, serviceStatus{ - serviceCode: sc, - total: len(serviceCodeMap[sc]), - online: onlineCnt, - offline: len(serviceCodeMap[sc]) - onlineCnt, - }) - } - - if len(args) == 1 { - serviceCode := args[0] - servers := serviceCodeMap[serviceCode] - liveServers := liveServerMaps[serviceCode] - - fmt.Printf("Servers in service code '%s':\n", serviceCode) - for _, server := range servers { - status := "offline" - if _, isLive := liveServers[server]; isLive { - status = "online" - } - fmt.Printf(" - %-21s %s\n", server, status) - } - fmt.Println() - } - fmt.Printf("%-25s %-8s %-8s %-8s\n", "SERVICE CODE", "TOTAL", "ONLINE", "OFFLINE") fmt.Println(strings.Repeat("-", 60)) for _, s := range statuses { - fmt.Printf("%-25s %-8d %-8d %-8d\n", s.serviceCode, s.total, s.online, s.offline) + fmt.Printf("%-25s %-8d %-8d %-8d\n", s.ServiceCode, s.Total, len(s.OnlineServers), len(s.OfflineServers)) } }, } - -func buildServiceCodeMap(zkConn *zk.Conn) (map[string][]string, error) { - serviceCodeMap := make(map[string][]string) - allServers, _, err := zkConn.Children(path.Join(internal.ArcusCacheServerMappingPath)) - if err != nil { - return nil, err - } - - for _, s := range allServers { - serviceCodeTags, _, err := zkConn.Children(path.Join(internal.ArcusCacheServerMappingPath, s)) - if err != nil { - continue - } - for _, sc := range serviceCodeTags { - serviceCodeMap[sc] = append(serviceCodeMap[sc], s) - } - } - - return serviceCodeMap, nil -} diff --git a/cmd/memcached/memcached.go b/cmd/memcached/memcached.go index 29d0f9a..3b364fb 100644 --- a/cmd/memcached/memcached.go +++ b/cmd/memcached/memcached.go @@ -1,28 +1,22 @@ package memcached import ( - "path" - "strings" - "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/types" + "github.com/jam2in/arcus-cli/internal/zookeeper" "github.com/spf13/cobra" ) -const ( - memcachedStartCommandTemplate = "%s/bin/memcached -E %s/lib/default_engine.so -X %s/lib/syslog_logger.so -X %s/lib/ascii_scrub.so -P %s/memcached-%s.pid -d -v -r -R5 -U 0 -D: -b 8192 %s -z %s" -) - var MemcachedCmd = &cobra.Command{ Use: "memcached", Short: "Memcached command", PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - ctx, err := internal.ContextWithZkConn(cmd.Context(), "", "") + ctx, err := zookeeper.ContextWithZkConn(cmd.Context(), "", "") if err != nil { return err } - zkConn := ctx.Value(internal.CtxZkConnKey{}).(*zk.Conn) - if err = internal.InitializeZK(zkConn); err != nil { + zkConn := ctx.Value(types.CtxZkConnKey{}).(*zk.Conn) + if err = zookeeper.InitializeZK(zkConn); err != nil { return err } cmd.SetContext(ctx) @@ -30,7 +24,7 @@ var MemcachedCmd = &cobra.Command{ return nil }, PersistentPostRun: func(cmd *cobra.Command, args []string) { - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) zkConn.Close() }, } @@ -45,42 +39,6 @@ func init() { MemcachedCmd.AddCommand(stopCmd) } -func getServiceCodeServers(zkConn *zk.Conn, serviceCode string) ([]string, error) { - servers, _, err := zkConn.Children(internal.ArcusCacheServerMappingPath) - if err != nil { - return nil, err - } - - var result []string - for _, server := range servers { - children, _, err := zkConn.Children(path.Join(internal.ArcusCacheServerMappingPath, server)) - if err != nil { - continue - } - for _, child := range children { - if child == serviceCode { - result = append(result, server) - break - } - } - } - return result, nil -} - -func getLiveServers(zkConn *zk.Conn, serviceCode string) (map[string]struct{}, error) { - liveNodes, _, err := zkConn.Children(path.Join(internal.ArcusCacheListPath, serviceCode)) - if err != nil { - return nil, err - } - - liveServers := make(map[string]struct{}) - for _, liveNode := range liveNodes { - addr, _, _ := strings.Cut(liveNode, "-") - liveServers[addr] = struct{}{} - } - return liveServers, nil -} - func filterServers(allServers []string, targets []string) []string { targetSet := make(map[string]struct{}) for _, t := range targets { diff --git a/cmd/memcached/remove.go b/cmd/memcached/remove.go index f5fef1f..e87ad23 100644 --- a/cmd/memcached/remove.go +++ b/cmd/memcached/remove.go @@ -3,11 +3,10 @@ package memcached import ( "fmt" "os" - "path" - "strings" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/memcached" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -17,16 +16,16 @@ var removeCmd = &cobra.Command{ Args: cobra.RangeArgs(1, 2), Run: func(cmd *cobra.Command, args []string) { serviceCode := args[0] - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) switch len(args) { case 1: - if err := removeServiceCode(zkConn, serviceCode); err != nil { + if err := memcached.RemoveServiceCode(zkConn, serviceCode); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } case 2: address := args[1] - if err := removeServer(zkConn, serviceCode, address); err != nil { + if err := memcached.RemoveFromServiceCode(zkConn, serviceCode, address); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } @@ -38,60 +37,3 @@ var removeCmd = &cobra.Command{ fmt.Printf("\nSuccessfully remove to service code %s\n", serviceCode) }, } - -func removeServiceCode(zkConn *zk.Conn, serviceCode string) error { - serverAddress, _, err := zkConn.Children(internal.ArcusCacheServerMappingPath) - if err != nil { - return err - } - - for _, address := range serverAddress { - mappingPath := path.Join(internal.ArcusCacheServerMappingPath, address, serviceCode) - exists, _, err := zkConn.Exists(mappingPath) - if err != nil { - return err - } - if exists { - return fmt.Errorf("%s exists", mappingPath) - } - } - - ops := []any{ - &zk.DeleteRequest{ - Path: path.Join(internal.ArcusCacheListPath, serviceCode), - Version: -1, - }, - &zk.DeleteRequest{ - Path: path.Join(internal.ArcusClientListPath, serviceCode), - Version: -1, - }, - } - if _, err = zkConn.Multi(ops...); err != nil { - return err - } - - return nil -} - -func removeServer(zkConn *zk.Conn, serviceCode, address string) error { - parts := strings.Split(address, ":") - if len(parts) != 2 { - return fmt.Errorf("invalid address: %s", address) - } - - ops := []any{ - &zk.DeleteRequest{ - Path: path.Join(internal.ArcusCacheServerMappingPath, address, serviceCode), - Version: -1, - }, - &zk.DeleteRequest{ - Path: path.Join(internal.ArcusCacheServerMappingPath, address), - Version: -1, - }, - } - if _, err := zkConn.Multi(ops...); err != nil { - return err - } - - return nil -} diff --git a/cmd/memcached/start.go b/cmd/memcached/start.go index 939114c..e9c95c2 100644 --- a/cmd/memcached/start.go +++ b/cmd/memcached/start.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "os" - "path" "strings" "time" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/memcached" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -20,15 +20,16 @@ var startCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { serviceCode := args[0] targetServers := args[1:] - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) + memcachedPath := os.Getenv("ARCUS_PATH") + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - globalConfig, _, err := zkConn.Get(path.Join(internal.ArcusCacheListPath, serviceCode)) + globalConfig, err := memcached.GetClusterConfig(zkConn, serviceCode) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - serviceCodeServers, err := getServiceCodeServers(zkConn, serviceCode) + serviceCodeServers, err := memcached.GetServiceCodeServers(zkConn, serviceCode) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -52,30 +53,11 @@ var startCmd = &cobra.Command{ os.Exit(1) } - client, err := internal.NewSSHClient(ip) - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - defer client.Close() - - session, err := client.NewSession() - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - defer session.Close() - - memcachedPath := os.Getenv("ARCUS_PATH") - command := fmt.Sprintf(memcachedStartCommandTemplate, - memcachedPath, memcachedPath, memcachedPath, memcachedPath, memcachedPath, - port, string(globalConfig), os.Getenv("ZK_ADDR")) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() errChan := make(chan error, 1) go func() { - errChan <- session.Run(command) + errChan <- memcached.StartMemcachedProcess(os.Getenv("ZK_ADDR"), ip, port, memcachedPath, string(globalConfig)) }() select { case err := <-errChan: diff --git a/cmd/memcached/stop.go b/cmd/memcached/stop.go index 316ea95..e6d63f6 100644 --- a/cmd/memcached/stop.go +++ b/cmd/memcached/stop.go @@ -6,7 +6,8 @@ import ( "strings" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/memcached" + "github.com/jam2in/arcus-cli/internal/types" "github.com/spf13/cobra" ) @@ -17,9 +18,9 @@ var stopCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { serviceCode := args[0] targetServers := args[1:] - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) - serviceCodeServers, err := getServiceCodeServers(zkConn, serviceCode) + serviceCodeServers, err := memcached.GetServiceCodeServers(zkConn, serviceCode) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -41,24 +42,7 @@ var stopCmd = &cobra.Command{ fmt.Fprintln(os.Stderr, "Invalid server address:", serverAddress) os.Exit(1) } - - client, err := internal.NewSSHClient(ip) - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - defer client.Close() - - session, err := client.NewSession() - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - defer session.Close() - - pidFilePath := fmt.Sprintf("%s/memcached-%s.pid", os.Getenv("ARCUS_PATH"), port) - command := fmt.Sprintf("kill -INT $(cat %s)", pidFilePath) - if err := session.Run(command); err != nil { + if err := memcached.StopMemcachedProcess(ip, port, os.Getenv("ARCUS_PATH")); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } diff --git a/cmd/zookeeper/init.go b/cmd/zookeeper/init.go index bc9e666..f7de72a 100644 --- a/cmd/zookeeper/init.go +++ b/cmd/zookeeper/init.go @@ -5,7 +5,8 @@ import ( "os" "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/types" + "github.com/jam2in/arcus-cli/internal/zookeeper" "github.com/spf13/cobra" ) @@ -13,8 +14,8 @@ var initCmd = &cobra.Command{ Use: "init", Short: "Initialize the basic Arcus znode structure in Zookeeper.", Run: func(cmd *cobra.Command, args []string) { - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) - err := internal.InitializeZK(zkConn) + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) + err := zookeeper.InitializeZK(zkConn) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) diff --git a/cmd/zookeeper/zookeeper.go b/cmd/zookeeper/zookeeper.go index 3e8650c..a1eb04c 100644 --- a/cmd/zookeeper/zookeeper.go +++ b/cmd/zookeeper/zookeeper.go @@ -2,7 +2,8 @@ package zookeeper import ( "github.com/go-zookeeper/zk" - "github.com/jam2in/arcus-cli/internal" + "github.com/jam2in/arcus-cli/internal/types" + "github.com/jam2in/arcus-cli/internal/zookeeper" "github.com/spf13/cobra" ) @@ -13,7 +14,7 @@ var ZookeeperCmd = &cobra.Command{ "This includes initializing the required znode directory layout. that Arcus\n" + "and controlling the lifecycle of the Zookeeper cluster.", PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - ctx, err := internal.ContextWithZkConn(cmd.Context(), "", "") + ctx, err := zookeeper.ContextWithZkConn(cmd.Context(), "", "") if err != nil { return err } @@ -22,7 +23,7 @@ var ZookeeperCmd = &cobra.Command{ return nil }, PersistentPostRun: func(cmd *cobra.Command, args []string) { - zkConn := cmd.Context().Value(internal.CtxZkConnKey{}).(*zk.Conn) + zkConn := cmd.Context().Value(types.CtxZkConnKey{}).(*zk.Conn) zkConn.Close() }, } diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..f84c676 --- /dev/null +++ b/config/config.go @@ -0,0 +1,19 @@ +package config + +const ( + ArcusRootPath = "/arcus" + AclRootPath = "/arcus_acl" + ArcusCacheListPath = "/arcus/cache_list" + ArcusClientListPath = "/arcus/client_list" + ArcusCacheServerMappingPath = "/arcus/cache_server_mapping" + + PropName = "authPassword" +) + +var ArcusBasicPaths = []string{ + ArcusRootPath, + AclRootPath, + ArcusCacheListPath, + ArcusClientListPath, + ArcusCacheServerMappingPath, +} diff --git a/internal/acl/acl.go b/internal/acl/acl.go new file mode 100644 index 0000000..2aa3f1e --- /dev/null +++ b/internal/acl/acl.go @@ -0,0 +1,91 @@ +package acl + +import ( + "path" + "strings" + + "github.com/go-zookeeper/zk" + "github.com/jam2in/arcus-cli/config" + "github.com/jam2in/arcus-cli/internal/scram" + "github.com/jam2in/arcus-cli/internal/types" +) + +func AddGroup(zkConn *zk.Conn, zkAcl []zk.ACL, groupName string) error { + _, err := zkConn.Create(config.AclRootPath+"/"+groupName, nil, 0, zkAcl) + if err != nil { + return err + } + return nil +} + +func RemoveGroup(zkConn *zk.Conn, groupName string) error { + if err := zkConn.Delete(config.AclRootPath+"/"+groupName, -1); err != nil { + return err + } + return nil +} + +func GetGroups(zkConn *zk.Conn) ([]string, error) { + groups, _, err := zkConn.Children(config.AclRootPath) + if err != nil { + return nil, err + } + return groups, nil +} + +func AddUser(zkConn *zk.Conn, zkAcl []zk.ACL, groupName, userName, password, role string) error { + secret := scram.GenerateScramSHA256Secret(password, nil, 0) + ops := []any{ + &zk.CreateRequest{ + Path: config.AclRootPath + "/" + groupName + "/" + userName, + Data: []byte(role), + Acl: zkAcl, + Flags: 0, + }, + &zk.CreateRequest{ + Path: config.AclRootPath + "/" + groupName + "/" + userName + "/" + config.PropName, + Data: []byte(secret.EncodeToBase64()), + Acl: zkAcl, + Flags: 0, + }, + } + _, err := zkConn.Multi(ops...) + + return err +} + +func RemoveUser(zkConn *zk.Conn, groupName, userName string) error { + ops := []any{ + &zk.DeleteRequest{ + Path: config.AclRootPath + "/" + groupName + "/" + userName + "/" + config.PropName, + Version: -1, + }, + &zk.DeleteRequest{ + Path: config.AclRootPath + "/" + groupName + "/" + userName, + Version: -1, + }, + } + _, err := zkConn.Multi(ops...) + + return err +} + +func GetUsers(zkConn *zk.Conn, groupName string) ([]types.UserInfo, error) { + groupPath := config.AclRootPath + "/" + groupName + + userNames, _, err := zkConn.Children(groupPath) + if err != nil { + return nil, err + } + + users := make([]types.UserInfo, 0, len(userNames)) + for _, userName := range userNames { + userPath := path.Join(groupPath, userName) + roleBytes, _, err := zkConn.Get(userPath) + if err != nil { + return nil, err + } + users = append(users, types.UserInfo{Username: userName, Roles: strings.Split(string(roleBytes), ",")}) + } + return users, nil +} diff --git a/internal/memcached/memcached.go b/internal/memcached/memcached.go new file mode 100644 index 0000000..e13c0f5 --- /dev/null +++ b/internal/memcached/memcached.go @@ -0,0 +1,246 @@ +package memcached + +import ( + "fmt" + "path" + "strings" + + "github.com/go-zookeeper/zk" + "github.com/jam2in/arcus-cli/config" + "github.com/jam2in/arcus-cli/internal/ssh" + "github.com/jam2in/arcus-cli/internal/types" +) + +const ( + memcachedStartCommandTemplate = "%s/bin/memcached -E %s/lib/default_engine.so -X %s/lib/syslog_logger.so -X %s/lib/ascii_scrub.so -P %s/memcached-%s.pid -d -v -r -R5 -U 0 -D: -b 8192 %s -z %s" + memcachedStopCommandTemplate = "kill -INT $(cat %s/memcached-%s.pid)" +) + +func GetClusterConfig(zkConn *zk.Conn, serviceCode string) ([]byte, error) { + globalConfig, _, err := zkConn.Get(path.Join(config.ArcusCacheListPath, serviceCode)) + if err != nil { + return nil, err + } + return globalConfig, nil +} + +func SetClusterConfig(zkConn *zk.Conn, serviceCode, globalConfig string) error { + cacheListPath := path.Join(config.ArcusCacheListPath, serviceCode) + exists, _, err := zkConn.Exists(cacheListPath) + if err != nil { + return err + } + if exists { + if _, err := zkConn.Set(cacheListPath, []byte(globalConfig), -1); err != nil { + return err + } + } + return nil +} + +func AddToServiceCode(zkConn *zk.Conn, serviceCode, serverAddr string) error { + ops1 := []any{ + &zk.CreateRequest{ + Path: path.Join(config.ArcusCacheListPath, serviceCode), + Data: nil, + Acl: zk.WorldACL(zk.PermAll), + Flags: 0, + }, + &zk.CreateRequest{ + Path: path.Join(config.ArcusClientListPath, serviceCode), + Data: nil, + Acl: zk.WorldACL(zk.PermAll), + Flags: 0, + }, + } + if _, err := zkConn.Multi(ops1...); err != nil && err != zk.ErrNodeExists { + return err + } + + ops2 := []any{ + &zk.CreateRequest{ + Path: path.Join(config.ArcusCacheServerMappingPath, serverAddr), + Data: nil, + Acl: zk.WorldACL(zk.PermAll), + Flags: 0, + }, + &zk.CreateRequest{ + Path: path.Join(config.ArcusCacheServerMappingPath, serverAddr, serviceCode), + Data: nil, + Acl: zk.WorldACL(zk.PermAll), + Flags: 0, + }, + } + _, err := zkConn.Multi(ops2...) + return err +} + +func RemoveFromServiceCode(zkConn *zk.Conn, serviceCode, serverAddr string) error { + parts := strings.Split(serverAddr, ":") + if len(parts) != 2 { + return fmt.Errorf("invalid address: %s", serverAddr) + } + + ops := []any{ + &zk.DeleteRequest{ + Path: path.Join(config.ArcusCacheServerMappingPath, serverAddr, serviceCode), + Version: -1, + }, + &zk.DeleteRequest{ + Path: path.Join(config.ArcusCacheServerMappingPath, serverAddr), + Version: -1, + }, + } + _, err := zkConn.Multi(ops...) + + return err +} + +func RemoveServiceCode(zkConn *zk.Conn, serviceCode string) error { + serverAddress, _, err := zkConn.Children(config.ArcusCacheServerMappingPath) + if err != nil { + return err + } + + for _, address := range serverAddress { + mappingPath := path.Join(config.ArcusCacheServerMappingPath, address, serviceCode) + exists, _, err := zkConn.Exists(mappingPath) + if err != nil { + return err + } + if exists { + return fmt.Errorf("%s exists", mappingPath) + } + } + + ops := []any{ + &zk.DeleteRequest{ + Path: path.Join(config.ArcusCacheListPath, serviceCode), + Version: -1, + }, + &zk.DeleteRequest{ + Path: path.Join(config.ArcusClientListPath, serviceCode), + Version: -1, + }, + } + _, err = zkConn.Multi(ops...) + + return err +} + +func GetServiceCodeStatus(zkConn *zk.Conn, serviceCode string) (*types.Status, error) { + allServers, err := GetServiceCodeServers(zkConn, serviceCode) + if err != nil { + return nil, err + } + return buildStatusForServiceCode(zkConn, serviceCode, allServers) +} + +func GetAllServiceCodeStatus(zkConn *zk.Conn) ([]*types.Status, error) { + serviceCodeMap, err := buildServiceCodeMap(zkConn) + if err != nil { + return nil, err + } + + var statuses []*types.Status + for sc, allServers := range serviceCodeMap { + status, err := buildStatusForServiceCode(zkConn, sc, allServers) + if err != nil { + return nil, err + } + statuses = append(statuses, status) + } + return statuses, nil +} + +func GetServiceCodeServers(zkConn *zk.Conn, serviceCode string) ([]string, error) { + servers, _, err := zkConn.Children(config.ArcusCacheServerMappingPath) + if err != nil { + return nil, err + } + + var result []string + for _, server := range servers { + children, _, err := zkConn.Children(path.Join(config.ArcusCacheServerMappingPath, server)) + if err != nil { + continue + } + for _, child := range children { + if child == serviceCode { + result = append(result, server) + break + } + } + } + return result, nil +} + +func buildStatusForServiceCode(zkConn *zk.Conn, serviceCode string, allServers []string) (*types.Status, error) { + liveServersMap, err := getLiveServers(zkConn, serviceCode) + if err != nil && err != zk.ErrNoNode { + return nil, err + } + + var onlineServers []string + var offlineServers []string + for _, server := range allServers { + if _, isLive := liveServersMap[server]; isLive { + onlineServers = append(onlineServers, server) + } else { + offlineServers = append(offlineServers, server) + } + } + + return &types.Status{ + ServiceCode: serviceCode, + Total: len(allServers), + OnlineServers: onlineServers, + OfflineServers: offlineServers, + }, nil +} + +func buildServiceCodeMap(zkConn *zk.Conn) (map[string][]string, error) { + serviceCodeMap := make(map[string][]string) + allServers, _, err := zkConn.Children(path.Join(config.ArcusCacheServerMappingPath)) + if err != nil { + return nil, err + } + + for _, s := range allServers { + serviceCodeTags, _, err := zkConn.Children(path.Join(config.ArcusCacheServerMappingPath, s)) + if err != nil { + continue + } + for _, sc := range serviceCodeTags { + serviceCodeMap[sc] = append(serviceCodeMap[sc], s) + } + } + + return serviceCodeMap, nil +} + +func getLiveServers(zkConn *zk.Conn, serviceCode string) (map[string]struct{}, error) { + liveNodes, _, err := zkConn.Children(path.Join(config.ArcusCacheListPath, serviceCode)) + if err != nil { + return nil, err + } + + liveServers := make(map[string]struct{}) + for _, liveNode := range liveNodes { + addr, _, _ := strings.Cut(liveNode, "-") + liveServers[addr] = struct{}{} + } + return liveServers, nil +} + +func StartMemcachedProcess(zkServers, ip, port, arcusPath, config string) error { + command := fmt.Sprintf(memcachedStartCommandTemplate, + arcusPath, arcusPath, arcusPath, arcusPath, arcusPath, + port, config, zkServers) + return ssh.RunSSHCommand(ip, command) +} + +func StopMemcachedProcess(ip, port, arcusPath string) error { + command := fmt.Sprintf(memcachedStopCommandTemplate, arcusPath, port) + return ssh.RunSSHCommand(ip, command) +} diff --git a/internal/ssh.go b/internal/ssh/ssh.go similarity index 69% rename from internal/ssh.go rename to internal/ssh/ssh.go index c2b1633..6fb6223 100644 --- a/internal/ssh.go +++ b/internal/ssh/ssh.go @@ -1,4 +1,4 @@ -package internal +package ssh import ( "os" @@ -34,3 +34,19 @@ func NewSSHClient(ip string) (*ssh.Client, error) { return ssh.Dial("tcp", ip+":22", sshConfig) } + +func RunSSHCommand(ip, command string) error { + client, err := NewSSHClient(ip) + if err != nil { + return err + } + defer client.Close() + + session, err := client.NewSession() + if err != nil { + return err + } + defer session.Close() + + return session.Run(command) +} diff --git a/internal/types/types.go b/internal/types/types.go new file mode 100644 index 0000000..94f2aa7 --- /dev/null +++ b/internal/types/types.go @@ -0,0 +1,48 @@ +package types + +import ( + "fmt" + "strings" +) + +type CtxZkConnKey struct{} +type CtxZkAclKey struct{} +type CtxZkCloseKey struct{} + +type Status struct { + ServiceCode string + Total int + OnlineServers []string + OfflineServers []string +} + +func (s *Status) String() string { + var b strings.Builder + + b.WriteString(fmt.Sprintf("%-25s %-8s %-8s %-8s\n", "SERVICE CODE", "TOTAL", "ONLINE", "OFFLINE")) + b.WriteString(strings.Repeat("-", 60) + "\n") + b.WriteString(fmt.Sprintf("%-25s %-8d %-8d %-8d\n", s.ServiceCode, s.Total, len(s.OnlineServers), len(s.OfflineServers))) + + b.WriteString(fmt.Sprintf("Servers in serviceCode: '%s':\n", s.ServiceCode)) + if s.Total > 0 { + b.WriteString("\n[Online Servers]\n") + for _, server := range s.OnlineServers { + b.WriteString(fmt.Sprintf(" - %s\n", server)) + } + + b.WriteString("\n[Offline Servers]\n") + for _, server := range s.OfflineServers { + b.WriteString(fmt.Sprintf(" - %s\n", server)) + } + } + return b.String() +} + +type UserInfo struct { + Username string + Roles []string +} + +func (i UserInfo) String() string { + return fmt.Sprintf("Username: %s, Role: %s", i.Username, i.Roles) +} diff --git a/internal/zookeeper.go b/internal/zookeeper.go deleted file mode 100644 index 57f06c7..0000000 --- a/internal/zookeeper.go +++ /dev/null @@ -1,91 +0,0 @@ -package internal - -import ( - "context" - "fmt" - "io" - "log" - "os" - "strings" - "time" - - "github.com/go-zookeeper/zk" -) - -type CtxZkConnKey struct{} -type CtxZkAclKey struct{} -type CtxZkCloseKey struct{} - -const ( - AclRootPath = "/arcus_acl" - - ArcusCacheListPath = "/arcus/cache_list" - ArcusClientListPath = "/arcus/client_list" - ArcusCacheServerMappingPath = "/arcus/cache_server_mapping" -) - -var arcusBasicPaths = []string{ - "/arcus", - "/arcus/cache_list", - "/arcus/cache_server_mapping", - "/arcus/client_list", - "/arcus_repl", - "/arcus_repl/cache_list", - "/arcus_repl/cache_server_mapping", - "/arcus_repl/client_list", - "/arcus_repl/group_list", - "/arcus_repl/cloud_stat", - "/arcus_acl", -} - -func ContextWithZkConn(ctx context.Context, user, password string) (context.Context, error) { - addr := os.Getenv("ZK_ADDR") - if addr == "" { - return nil, fmt.Errorf("ZooKeeper address is not set") - } - - conn, _, err := zk.Connect(strings.Split(addr, ","), time.Second*5, - zk.WithLogger(log.New(io.Discard, "", 0))) - if err != nil { - return nil, err - } - - var acl []zk.ACL - if user != "" && password != "" { - if err := conn.AddAuth("digest", []byte(user+":"+password)); err != nil { - return nil, err - } - acl = append(zk.DigestACL(zk.PermAll, user, password), zk.WorldACL(zk.PermRead)...) - } else { - acl = zk.WorldACL(zk.PermAll) - } - - ctx = context.WithValue(ctx, CtxZkConnKey{}, conn) - ctx = context.WithValue(ctx, CtxZkAclKey{}, acl) - - return ctx, nil -} - -func InitializeZK(zkConn *zk.Conn) error { - ops := make([]any, 0) - for _, p := range arcusBasicPaths { - exist, _, err := zkConn.Exists(p) - if err != nil { - return err - } - if !exist { - ops = append(ops, &zk.CreateRequest{ - Path: p, - Data: []byte{}, - Acl: zk.WorldACL(zk.PermAll), - Flags: 0, - }) - } - } - if len(ops) > 0 { - if _, err := zkConn.Multi(ops...); err != nil { - return err - } - } - return nil -} diff --git a/internal/zookeeper/zookeeper.go b/internal/zookeeper/zookeeper.go new file mode 100644 index 0000000..e45b811 --- /dev/null +++ b/internal/zookeeper/zookeeper.go @@ -0,0 +1,140 @@ +package zookeeper + +import ( + "context" + "fmt" + "io" + "log" + "net" + "os" + "path" + "strings" + "time" + + "github.com/go-zookeeper/zk" + "github.com/jam2in/arcus-cli/config" + "github.com/jam2in/arcus-cli/internal/ssh" + "github.com/jam2in/arcus-cli/internal/types" +) + +const ( + zookeeperStartCommandTemplate = "%s/bin/zkServer.sh start" + zookeeperStopCommandTemplate = "%s/bin/zkServer.sh stop" + zookeeperConfigTemplate = `mkdir -p %[1]s && echo %[2]d > %[1]s/myid \ + mkdir -p %[3]s && \ + mv %[4]s %[4]s.bak$(date +%%s) 2>/dev/null || true && \ + cat << 'EOF' > %[4]s + %[5]s + EOF` +) + +func ContextWithZkConn(ctx context.Context, user, password string) (context.Context, error) { + addr := os.Getenv("ZK_ADDR") + if addr == "" { + return nil, fmt.Errorf("ZooKeeper address is not set") + } + + conn, _, err := zk.Connect(strings.Split(addr, ","), time.Second*5, + zk.WithLogger(log.New(io.Discard, "", 0))) + if err != nil { + return nil, err + } + + var acl []zk.ACL + if user != "" && password != "" { + if err := conn.AddAuth("digest", []byte(user+":"+password)); err != nil { + return nil, err + } + acl = append(zk.DigestACL(zk.PermAll, user, password), zk.WorldACL(zk.PermRead)...) + } else { + acl = zk.WorldACL(zk.PermAll) + } + + ctx = context.WithValue(ctx, types.CtxZkConnKey{}, conn) + ctx = context.WithValue(ctx, types.CtxZkAclKey{}, acl) + + return ctx, nil +} + +func InitializeZK(zkConn *zk.Conn) error { + ops := make([]any, 0) + for _, p := range config.ArcusBasicPaths { + exist, _, err := zkConn.Exists(p) + if err != nil { + return err + } + if !exist { + ops = append(ops, &zk.CreateRequest{ + Path: p, + Data: []byte{}, + Acl: zk.WorldACL(zk.PermAll), + Flags: 0, + }) + } + } + if len(ops) > 0 { + if _, err := zkConn.Multi(ops...); err != nil { + return err + } + } + return nil +} + +func StartZKProcess(ip, zkPath string) error { + command := fmt.Sprintf(zookeeperStartCommandTemplate, zkPath) + return ssh.RunSSHCommand(ip, command) +} + +func StopZKProcess(ip, zkPath string) error { + command := fmt.Sprintf(zookeeperStopCommandTemplate, zkPath) + return ssh.RunSSHCommand(ip, command) +} + +func StatZKProcess(server string) (string, error) { + conn, err := net.Dial("tcp", server) + if err != nil { + return "", err + } + defer conn.Close() + + _, err = conn.Write([]byte("stat")) + if err != nil { + return "", err + } + + response, err := io.ReadAll(conn) + if err != nil { + return "", err + } + + return string(response), nil +} + +func ConfigureZKFiles(ip, port, zkPath string, myid int, zkServers []string) error { + dataDir := path.Join(zkPath, "data") + confDir := path.Join(zkPath, "conf") + confPath := path.Join(confDir, "zoo.cfg") + zooCfgContent := buildZooCfg(zkServers, zkPath, port) + configZKCmd := fmt.Sprintf(zookeeperConfigTemplate, dataDir, myid, confDir, confPath, zooCfgContent) + return ssh.RunSSHCommand(ip, configZKCmd) +} + +func buildZooCfg(servers []string, zkPath, port string) string { + var zooCfg strings.Builder + zooCfg.WriteString("tickTime=2000\n") + zooCfg.WriteString("initLimit=10\n") + zooCfg.WriteString("syncLimit=5\n") + zooCfg.WriteString(fmt.Sprintf("dataDir=%s/data\n", zkPath)) + zooCfg.WriteString(fmt.Sprintf("clientPort=%s\n", port)) + zooCfg.WriteString("standaloneEnabled=false\n") + zooCfg.WriteString("reconfigEnabled=true\n") + zooCfg.WriteString("4lw.commands.whitelist=*\n\n") + zooCfg.WriteString("# Server Lists\n") + + for i, server := range servers { + ip := strings.Split(server, ":")[0] + zooCfg.WriteString(fmt.Sprintf("server.%d=%s:2888:3888\n", i+1, ip)) + } + + return zooCfg.String() +}