diff --git a/.gitignore b/.gitignore index 88d050b..693daf8 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -main \ No newline at end of file +main +.vscode/ \ No newline at end of file diff --git a/go.mod b/go.mod index ad32b6c..a6e2e31 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,7 @@ go 1.14 require ( github.com/gorilla/mux v1.7.4 + github.com/hashicorp/raft v1.1.2 + github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617 github.com/rs/zerolog v1.19.0 ) diff --git a/go.sum b/go.sum index 4284e25..2f8c8da 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,57 @@ +github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= +github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= +github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= +github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= +github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c= +github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= +github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617 h1:CJDRE/2tBNFOrcoexD2nvTRbQEox3FDxl4NxIezp1b8= +github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.19.0 h1:hYz4ZVdUgjXTBUmrkrw55j1nHx68LfOKIQk5IYtyScg= github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/consensus/fsm.go b/internal/consensus/fsm.go new file mode 100644 index 0000000..8b78ac5 --- /dev/null +++ b/internal/consensus/fsm.go @@ -0,0 +1,109 @@ +package consensus + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/SystemBuilders/LocKey/internal/lockservice" + "github.com/hashicorp/raft" +) + +type fsm RaftStore + +type fsmSnapshot struct { + lockMap map[string]string +} + +func (f *fsm) Apply(l *raft.Log) interface{} { + var c command + if err := json.Unmarshal(l.Data, &c); err != nil { + panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) + } + + switch c.Op { + case "acquire": + return f.applyAcquire(c.Key, c.Value) + + case "release": + return f.applyRelease(c.Key, c.Value) + + default: + panic(fmt.Sprintf("unrecognized command op: %s", c.Op)) + + } +} + +func (f *fsm) applyAcquire(lock, owner string) interface{} { + desc := lockservice.NewSimpleDescriptor(lock, owner) + + err := f.ls.Acquire(desc) + if err != nil { + return err + } + return nil +} + +func (f *fsm) applyRelease(lock, owner string) interface{} { + desc := lockservice.NewSimpleDescriptor(lock, owner) + + err := f.ls.Release(desc) + if err != nil { + return err + } + return nil +} + +// Snapshot returns a snapshot of the key-value store. We wrap +// the things we need in fsmSnapshot and then send that over to Persist. +// Persist encodes the needed data from fsmsnapshot and transport it to +// Restore where the necessary data is replicated into the finite state machine. +// This allows the consensus algorithm to truncate the replicated log. +func (f *fsm) Snapshot() (raft.FSMSnapshot, error) { + return &fsmSnapshot{lockMap: f.ls.GetLockMap()}, nil +} + +// Restores the lockMap to a previous state +func (f *fsm) Restore(lockMap io.ReadCloser) error { + lockMapSnapshot := make(map[string]string) + if err := json.NewDecoder(lockMap).Decode(&lockMapSnapshot); err != nil { + return err + } + + // Set the state from snapshot. No need to use mutex lock according + // to Hasicorp doc + f.ls.SetLockMap(lockMapSnapshot) + + return nil +} + +func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error { + err := func() error { + // Encode data + b, err := json.Marshal(f.lockMap) + if err != nil { + return err + } + + // Write data to sink + if _, err := sink.Write(b); err != nil { + return err + } + + // Close the sink + if err := sink.Close(); err != nil { + return err + } + + return nil + }() + + if err != nil { + sink.Cancel() + return err + } + + return nil +} + +func (f *fsmSnapshot) Release() {} diff --git a/internal/consensus/listener.go b/internal/consensus/listener.go new file mode 100644 index 0000000..74e7bd1 --- /dev/null +++ b/internal/consensus/listener.go @@ -0,0 +1,43 @@ +package consensus + +import ( + "log" + "net" + "net/http" +) + +// Start starts the http service using the listener within a RaftStore. +// +// The HTTP server is used to redirect commands like Set, Delete and Join +// to the leader RaftStore in a cluster. The HTTP address is always +// one away from the Raft address which the raft node uses for communication +// with other raft nodes. +// +// This policy is maybe a bit to trivial and in the future, a more dynamic +// mapping between a Raft node and its listener can be integrated +func (rs *RaftStore) Start() error { + server := http.Server{ + Handler: rs, + } + + ln, err := net.Listen("tcp", rs.httpAddr) + if err != nil { + return err + } + rs.ln = ln + + go func() { + err := server.Serve(rs.ln) + if err != nil { + log.Fatalf("HTTP serve: %s", err) + } + }() + + return nil +} + +// Close stops the listener corresponding to a Raft node. +func (rs *RaftStore) Close() { + rs.ln.Close() + return +} diff --git a/internal/consensus/raft.go b/internal/consensus/raft.go new file mode 100644 index 0000000..d549b23 --- /dev/null +++ b/internal/consensus/raft.go @@ -0,0 +1,198 @@ +package consensus + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/SystemBuilders/LocKey/internal/lockservice" + "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb" + "github.com/rs/zerolog" +) + +const ( + retainSnapshotCount = 2 + raftTimeout = 10 * time.Second +) + +type command struct { + Op string `json:"op,omitempty"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` +} + +// A RaftStore encapsulates the http server (listener), +// a raft node (RaftDir, RaftAddr, RaftServer) and a +// lock service (SimpleLockService) +type RaftStore struct { + httpAddr string + ls *lockservice.SimpleLockService + inmem bool + RaftDir string + RaftAddr string + RaftServer *raft.Raft + ln net.Listener + logger *log.Logger +} + +// New returns a new instance of RaftStore. +func New(inmem bool) *RaftStore { + return &RaftStore{ + ls: lockservice.NewSimpleLockService(zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())), + inmem: inmem, + logger: log.New(os.Stderr, "[store] ", log.LstdFlags), + } +} + +// Open opens the store. If enableSingle is set, and there are no existing peers, +// then this node becomes the first node, and therefore leader, of the cluster. +// localID should be the server identifier for this node. +func (rs *RaftStore) Open(enableSingle bool, localID string) error { + // Setup Raft configuration. + config := raft.DefaultConfig() + config.LocalID = raft.ServerID(localID) + + httpAddr, err := getHTTPAddr(rs.RaftAddr) + if err != nil { + return err + } + rs.httpAddr = httpAddr + + // Setup Raft communication. + addr, err := net.ResolveTCPAddr("tcp", rs.RaftAddr) + if err != nil { + return err + } + transport, err := raft.NewTCPTransport(rs.RaftAddr, addr, 3, 10*time.Second, os.Stderr) + if err != nil { + return err + } + + // Create the snapshot store. This allows the Raft to truncate the log. + snapshots, err := raft.NewFileSnapshotStore(rs.RaftDir, retainSnapshotCount, os.Stderr) + if err != nil { + return fmt.Errorf("file snapshot store: %s", err) + } + + // Create the log store and stable store. + var logStore raft.LogStore + var stableStore raft.StableStore + if rs.inmem { + logStore = raft.NewInmemStore() + stableStore = raft.NewInmemStore() + } else { + boltDB, err := raftboltdb.NewBoltStore(filepath.Join(rs.RaftDir, "raft.db")) + if err != nil { + return fmt.Errorf("new bolt store: %s", err) + } + logStore = boltDB + stableStore = boltDB + } + + // Instantiate the Raft systems. + ra, err := raft.NewRaft(config, (*fsm)(rs), logStore, stableStore, snapshots, transport) + if err != nil { + return fmt.Errorf("new raft: %s", err) + } + rs.RaftServer = ra + + if enableSingle { + configuration := raft.Configuration{ + Servers: []raft.Server{ + { + ID: config.LocalID, + Address: transport.LocalAddr(), + }, + }, + } + ra.BootstrapCluster(configuration) + } + + return nil +} + +// Join facilitates the addition of a new Raft node to an existing +// cluster. +// +// The addresss and nodeID are those of the node to be added to the +// cluster. +// +// This function will called on an instance of RaftStore of a node +// existing in the cluster. The function internally sends a HTTP +// request to the listener of the leader of the cluster with the +// new node's ID and address. +func (rs *RaftStore) Join(addr, ID string) error { + b, err := json.Marshal(map[string]string{"addr": addr, "id": ID}) + if err != nil { + return err + } + + var postAddr string + if rs.RaftServer.Leader() == "" { + postAddr = rs.RaftAddr + } else { + postAddr = string(rs.RaftServer.Leader()) + } + + httpAddr, err := getHTTPAddr(postAddr) + if err != nil { + return err + } + + resp, err := http.Post( + fmt.Sprintf("http://%s/join", httpAddr), + "application-type/json", + bytes.NewReader(b), + ) + if err != nil { + return err + } + defer resp.Body.Close() + if err != nil { + return err + } + + return nil +} + +// getHTTPAddr and getRaftAddr implement the mapping between +// a Raft node and its HTTP listener. +// The mapping is such that the listener's IP address is 1 +// more than that of the Raft node's. +// +// Given the address of a Raft node, getHTTPAddr() returns +// the IP address of its listener +// +// Given the address of the HTTP listener of a Raft node, +// getRaftAddr() returns the IP address of the corresponding +// Raft node. +func getHTTPAddr(raftAddr string) (string, error) { + addrParts := strings.Split(raftAddr, ":") + httpHost := addrParts[0] + port, err := strconv.Atoi(addrParts[1]) + if err != nil { + return "", err + } + return fmt.Sprintf("%s:%d", httpHost, port+1), nil + +} + +func getRaftAddr(raftAddr string) string { + addrParts := strings.Split(raftAddr, ":") + httpHost := addrParts[0] + port, err := strconv.Atoi(addrParts[1]) + if err != nil { + return "" + } + return fmt.Sprintf("%s:%d", httpHost, port-1) + +} diff --git a/internal/consensus/routing.go b/internal/consensus/routing.go new file mode 100644 index 0000000..0ab0d84 --- /dev/null +++ b/internal/consensus/routing.go @@ -0,0 +1,202 @@ +package consensus + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/SystemBuilders/LocKey/internal/lockservice" + "github.com/hashicorp/raft" +) + +// ServeHTTP is a function needed to allow RaftStore to act as a handler +// to a HTTP server. +func (rs *RaftStore) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if getRaftAddr(rs.httpAddr) != string(rs.RaftServer.Leader()) { + url := r.URL + url.Host, _ = getHTTPAddr(string(rs.RaftServer.Leader())) + url.Scheme = "http" + + proxyReq, err := http.NewRequest(r.Method, url.String(), r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + proxyReq.Header.Set("Host", r.Host) + proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + + for header, values := range r.Header { + for _, value := range values { + proxyReq.Header.Add(header, value) + } + } + + client := &http.Client{} + resp, err := client.Do(proxyReq) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer resp.Body.Close() + return + } + if strings.Contains(r.URL.Path, "/acquire") { + rs.handleAcquire(w, r) + } else if strings.Contains(r.URL.Path, "/release") { + rs.handleRelease(w, r) + } else if strings.Contains(r.URL.Path, "/join") { + rs.handleJoin(w, r) + } else { + w.WriteHeader(http.StatusNotFound) + } +} + +func (rs *RaftStore) handleAcquire(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req lockservice.LockRequest + err = json.Unmarshal(body, &req) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + c := &command{ + Op: "acquire", + Key: req.FileID, + Value: req.UserID, + } + b, err := json.Marshal(c) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + // Check if acquire is possible + desc := lockservice.NewSimpleDescriptor(req.FileID, req.UserID) + err = rs.ls.TryAcquire(desc) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + // If possible, commit the change + f := rs.RaftServer.Apply(b, raftTimeout) + if f.Error() != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.Write([]byte("lock acquired")) +} + +func (rs *RaftStore) handleRelease(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req lockservice.LockRequest + err = json.Unmarshal(body, &req) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + c := &command{ + Op: "release", + Key: req.FileID, + Value: req.UserID, + } + b, err := json.Marshal(c) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + // Check if release is possible + desc := lockservice.NewSimpleDescriptor(req.FileID, req.UserID) + err = rs.ls.TryRelease(desc) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + // If possible, commit the change + f := rs.RaftServer.Apply(b, raftTimeout) + if f.Error() != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.Write([]byte("lock released")) +} + +// handleJoin actually applies the join upon receiving the http request. +func (rs *RaftStore) handleJoin(w http.ResponseWriter, r *http.Request) { + m := map[string]string{} + err := json.NewDecoder(r.Body).Decode(&m) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + remoteAddr := m["addr"] + nodeID := m["id"] + + // f := rs.RaftServer.AddPeer(raft.ServerAddress(remoteAddr)) + err = rs.joinHelper(nodeID, remoteAddr) + if err != nil { + fmt.Println(err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + + rs.logger.Printf("node at %s joined successfully", remoteAddr) + w.Write([]byte("joined cluster")) +} + +// Join joins a node, identified by nodeID and located at addr, to this store. +// The node must be ready to respond to Raft communications at that address. +func (rs *RaftStore) joinHelper(nodeID, addr string) error { + rs.logger.Printf("received join request for remote node %s at %s", nodeID, addr) + + configFuture := rs.RaftServer.GetConfiguration() + if err := configFuture.Error(); err != nil { + rs.logger.Printf("failed to get raft configuration: %v", err) + return err + } + + for _, srv := range configFuture.Configuration().Servers { + // If a node already exists with either the joining node's ID or address, + // that node may need to be removed from the config first. + if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) { + // However if *both* the ID and the address are the same, then nothing -- not even + // a join operation -- is needed. + if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) { + rs.logger.Printf("node %s at %s already member of cluster, ignoring join request", nodeID, addr) + return nil + } + + future := rs.RaftServer.RemoveServer(srv.ID, 0, 0) + if err := future.Error(); err != nil { + return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err) + } + } + } + + f := rs.RaftServer.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0) + if f.Error() != nil { + return f.Error() + } + rs.logger.Printf("node %s at %s joined successfully", nodeID, addr) + return nil +} diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 6013bfb..59995b9 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -20,7 +20,7 @@ type SimpleClient struct { cache cache.LRUCache } -// NewSimpleKey returns a new SimpleKey of the given value. +// NewSimpleClient returns a new instance of SimpleClient func NewSimpleClient(config lockservice.SimpleConfig, cache cache.LRUCache) *SimpleClient { return &SimpleClient{ config: config, @@ -42,9 +42,9 @@ func (sc *SimpleClient) Acquire(d lockservice.Descriptors) error { } testData := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - requestJson, err := json.Marshal(testData) + requestJSON, err := json.Marshal(testData) - req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJson)) + req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) if err != nil { return err } @@ -77,8 +77,8 @@ func (sc *SimpleClient) Release(d lockservice.Descriptors) error { endPoint := sc.config.IPAddr + ":" + sc.config.PortAddr + "/release" testData := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - requestJson, err := json.Marshal(testData) - req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJson)) + requestJSON, err := json.Marshal(testData) + req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) req.Header.Set("Content-Type", "application/json") client := &http.Client{} diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index 4650ae9..69e93c5 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -1,27 +1,54 @@ package lockclient import ( - "os" + "fmt" "testing" "time" "github.com/SystemBuilders/LocKey/internal/cache" + "github.com/SystemBuilders/LocKey/internal/consensus" "github.com/SystemBuilders/LocKey/internal/lockservice" - "github.com/SystemBuilders/LocKey/internal/node" - - "github.com/rs/zerolog" ) func TestAcquireandRelease(t *testing.T) { - zerolog.New(os.Stdout).With() - - log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) - scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") - ls := lockservice.NewSimpleLockService(log) + scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "5001") + scfg1 := lockservice.NewSimpleConfig("http://127.0.0.1", "7001") quit := make(chan bool, 1) go func() { - node.Start(ls, *scfg) + raftLS := consensus.New(true) + raftLS.RaftAddr = "127.0.0.1:5000" + raftLS.Open(true, "node0") + raftLS.Start() + + // Give the server time to start up + time.Sleep(3 * time.Second) + + raftLS2 := consensus.New(true) + raftLS2.RaftAddr = "127.0.0.1:6000" + raftLS2.Open(true, "node2") + raftLS2.Start() + + time.Sleep(3 * time.Second) + + fmt.Printf("joining") + raftLS.Join("127.0.0.1:6000", "node2") + + // Extra time is needed for the join operation to complete + time.Sleep(5 * time.Second) + + raftLS3 := consensus.New(true) + raftLS3.RaftAddr = "127.0.0.1:7000" + raftLS3.Open(true, "node3") + raftLS3.Start() + + time.Sleep(3 * time.Second) + + fmt.Printf("joining") + raftLS.Join("127.0.0.1:7000", "node3") + + time.Sleep(5 * time.Second) + for { select { case <-quit: @@ -31,12 +58,13 @@ func TestAcquireandRelease(t *testing.T) { } }() - // Server takes some time to start - time.Sleep(100 * time.Millisecond) + // Give time for the cluster to form + time.Sleep(25 * time.Second) t.Run("acquire test release test", func(t *testing.T) { size := 5 cache := cache.NewLRUCache(size) sc := NewSimpleClient(*scfg, *cache) + sc1 := NewSimpleClient(*scfg1, *cache) d := lockservice.NewSimpleDescriptor("test", "owner") @@ -55,16 +83,14 @@ func TestAcquireandRelease(t *testing.T) { d = lockservice.NewSimpleDescriptor("test", "owner") - got = sc.Release(d) - + got = sc1.Release(d) if got != want { t.Errorf("release: got %q want %q", got, want) } d = lockservice.NewSimpleDescriptor("test1", "owner") - got = sc.Release(d) - + got = sc1.Release(d) if got != want { t.Errorf("release: got %q want %q", got, want) } @@ -114,7 +140,7 @@ func TestAcquireandRelease(t *testing.T) { got = sc.Release(d) want = lockservice.ErrUnauthorizedAccess if got != want { - t.Errorf("acquire: got %v want %v", got, want) + t.Errorf("release: got %v want %v", got, want) } d = lockservice.NewSimpleDescriptor("test2", "owner1") diff --git a/internal/lockclient/test/raft.db b/internal/lockclient/test/raft.db new file mode 100644 index 0000000..dae39e4 Binary files /dev/null and b/internal/lockclient/test/raft.db differ diff --git a/internal/lockservice/simpleLockService.go b/internal/lockservice/simpleLockService.go index 42e4fad..1fedc6a 100644 --- a/internal/lockservice/simpleLockService.go +++ b/internal/lockservice/simpleLockService.go @@ -18,11 +18,15 @@ type SimpleConfig struct { PortAddr string } +// LockRequest is a struct used by the client to +// communicate to the HTTP server acting as a listener +// for each Raft node type LockRequest struct { FileID string `json:"FileID"` UserID string `json:"UserID"` } +// IP returns the IP address from SimpleConfig func (scfg *SimpleConfig) IP() string { return scfg.IPAddr } @@ -63,6 +67,7 @@ func (sd *SimpleDescriptor) Owner() string { return sd.UserID } +// NewSimpleConfig returns a new simple configuration func NewSimpleConfig(IPAddr, PortAddr string) *SimpleConfig { return &SimpleConfig{ IPAddr: IPAddr, @@ -70,6 +75,7 @@ func NewSimpleConfig(IPAddr, PortAddr string) *SimpleConfig { } } +// NewSimpleDescriptor returns a new simple descriptor func NewSimpleDescriptor(FileID, UserID string) *SimpleDescriptor { return &SimpleDescriptor{ FileID: FileID, @@ -110,6 +116,25 @@ func (ls *SimpleLockService) Acquire(sd Descriptors) error { return nil } +// TryAcquire checks if it is possible to lock a file. +// It is used as a check before a acquire() action is +// actually committed in the log of the distributed +// consensus algorithm. +func (ls *SimpleLockService) TryAcquire(sd Descriptors) error { + ls.lockMap.Mutex.Lock() + if _, ok := ls.lockMap.LockMap[sd.ID()]; ok { + ls.lockMap.Mutex.Unlock() + ls. + log. + Debug(). + Str("descriptor", sd.ID()). + Msg("can't acquire, already been acquired") + return ErrFileAcquired + } + ls.lockMap.Mutex.Unlock() + return nil +} + // Release lets a client to release a lock on an object. func (ls *SimpleLockService) Release(sd Descriptors) error { ls.lockMap.Mutex.Lock() @@ -146,6 +171,37 @@ func (ls *SimpleLockService) Release(sd Descriptors) error { } +// TryRelease checks if it is possible to release a file. +// It is used as a check before a release() action is +// actually committed in the log of the distributed +// consensus algorithm. +func (ls *SimpleLockService) TryRelease(sd Descriptors) error { + ls.lockMap.Mutex.Lock() + if ls.lockMap.LockMap[sd.ID()] == sd.Owner() { + ls.lockMap.Mutex.Unlock() + return nil + } else if _, ok := ls.lockMap.LockMap[sd.ID()]; !ok { + ls. + log. + Debug(). + Str("descriptor", sd.ID()). + Msg("can't release, hasn't been acquired") + ls.lockMap.Mutex.Unlock() + return ErrCantReleaseFile + + } else { + ls. + log. + Debug(). + Str("descriptor", sd.ID()). + Msg("can't release, unauthorized access") + ls.lockMap.Mutex.Unlock() + return ErrUnauthorizedAccess + + } + +} + // CheckAcquired returns true if the file is acquired func (ls *SimpleLockService) CheckAcquired(sd Descriptors) bool { ls.lockMap.Mutex.Lock() @@ -187,3 +243,24 @@ func (ls *SimpleLockService) CheckReleased(sd Descriptors) bool { ls.lockMap.Mutex.Unlock() return true } + +// GetLockMap returns a copy of the current state of the +// lockservice's lock map. +// This function is used by the Snapshot() function of the +// finite state machine of the distributed consensus algorithm +func (ls *SimpleLockService) GetLockMap() map[string]string { + ls.lockMap.Mutex.Lock() + defer ls.lockMap.Mutex.Unlock() + + return ls.lockMap.LockMap + +} + +// SetLockMap sets the LockMap used by SimpleLockService to +// a snapshot of it that is passed as an argument to the +// function +func (ls *SimpleLockService) SetLockMap(lockMapSnapshot map[string]string) { + // Set the state from snapshot. No need to use mutex lock according + // to Hasicorp doc + ls.lockMap.LockMap = lockMapSnapshot +} diff --git a/internal/lockservice/test/raft.db b/internal/lockservice/test/raft.db new file mode 100644 index 0000000..280e025 Binary files /dev/null and b/internal/lockservice/test/raft.db differ diff --git a/internal/node/simple_node.go b/internal/node/simple_node.go index 78dfe79..ef7a4ce 100644 --- a/internal/node/simple_node.go +++ b/internal/node/simple_node.go @@ -17,6 +17,18 @@ import ( "github.com/gorilla/mux" ) +// func (rs *lockservice.RaftStore) Start() error { +// ln, err := net.Listen("tcp", rs.httpAddr) +// if err != nil { +// return err +// } +// rs.ln = ln +// server := http.Server{ +// Handler: rs, +// } + +// } + // Start begins the node's operation as a http server. func Start(ls *lockservice.SimpleLockService, scfg lockservice.SimpleConfig) error {