From ede56d4967afb77bf0f62b1eab1d9e05a717caf7 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Sat, 8 Aug 2020 18:03:07 +0530 Subject: [PATCH 01/10] reworked implementation following huffleraft --- .gitignore | 3 +- go.mod | 2 + go.sum | 43 ++++++++ internal/lockservice/fsm.go | 111 ++++++++++++++++++++ internal/lockservice/raft.go | 189 +++++++++++++++++++++++++++++++++++ 5 files changed, 347 insertions(+), 1 deletion(-) create mode 100644 internal/lockservice/fsm.go create mode 100644 internal/lockservice/raft.go diff --git a/.gitignore b/.gitignore index 88d050b..693daf8 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -main \ No newline at end of file +main +.vscode/ \ No newline at end of file diff --git a/go.mod b/go.mod index ad32b6c..a6e2e31 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,7 @@ go 1.14 require ( github.com/gorilla/mux v1.7.4 + github.com/hashicorp/raft v1.1.2 + github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617 github.com/rs/zerolog v1.19.0 ) diff --git a/go.sum b/go.sum index 4284e25..2f8c8da 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,57 @@ +github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= +github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= +github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= +github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= +github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c= +github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= +github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617 h1:CJDRE/2tBNFOrcoexD2nvTRbQEox3FDxl4NxIezp1b8= +github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.19.0 h1:hYz4ZVdUgjXTBUmrkrw55j1nHx68LfOKIQk5IYtyScg= github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/lockservice/fsm.go b/internal/lockservice/fsm.go new file mode 100644 index 0000000..25f4418 --- /dev/null +++ b/internal/lockservice/fsm.go @@ -0,0 +1,111 @@ +package lockservice + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/hashicorp/raft" +) + +type fsm RaftStore + +type fsmSnapshot struct { + lockMap map[string]string +} + +func (f *fsm) Apply(l *raft.Log) interface{} { + var c command + if err := json.Unmarshal(l.Data, &c); err != nil { + panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) + } + + switch c.Op { + case "acquire": + return f.applyAcquire(c.Lock, c.Owner) + + case "release": + return f.applyRelease(c.Lock, c.Owner) + + default: + panic(fmt.Sprintf("unrecognized command op: %s", c.Op)) + + } +} + +func (f *fsm) applyAcquire(lock, owner string) interface{} { + desc := NewSimpleDescriptor(lock, owner) + + err := f.ls.Acquire(desc) + if err != nil { + return err + } + return nil +} + +func (f *fsm) applyRelease(lock, owner string) interface{} { + desc := NewSimpleDescriptor(lock, owner) + + err := f.ls.Release(desc) + if err != nil { + return err + } + return nil +} + +// Snapshot returns a snapshot of the key-value store. We wrap +// the things we need in fsmSnapshot and then send that over to Persist. +// Persist encodes the needed data from fsmsnapshot and transport it to +// Restore where the necessary data is replicated into the finite state machine. +// This allows the consensus algorithm to truncate the replicated log. +func (f *fsm) Snapshot() (raft.FSMSnapshot, error) { + f.ls.lockMap.Mutex.Lock() + defer f.ls.lockMap.Mutex.Unlock() + + return &fsmSnapshot{lockMap: f.ls.lockMap.LockMap}, nil +} + +// Restores the lockMap to a previous state +func (f *fsm) Restore(lockMap io.ReadCloser) error { + lockMapSnapshot := make(map[string]string) + if err := json.NewDecoder(lockMap).Decode(&lockMapSnapshot); err != nil { + return err + } + + // Set the state from snapshot. No need to use mutex lock according + // to Hasicorp doc + f.ls.lockMap.LockMap = lockMapSnapshot + + return nil +} + +func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error { + err := func() error { + // Encode data + b, err := json.Marshal(f.lockMap) + if err != nil { + return err + } + + // Write data to sink + if _, err := sink.Write(b); err != nil { + return err + } + + // Close the sink + if err := sink.Close(); err != nil { + return err + } + + return nil + }() + + if err != nil { + sink.Cancel() + return err + } + + return nil +} + +func (f *fsmSnapshot) Release() {} diff --git a/internal/lockservice/raft.go b/internal/lockservice/raft.go new file mode 100644 index 0000000..f6089eb --- /dev/null +++ b/internal/lockservice/raft.go @@ -0,0 +1,189 @@ +package lockservice + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb" + "github.com/rs/zerolog" +) + +const ( + retainSnapshotCount = 2 + raftTimeout = 10 * time.Second +) + +type command struct { + Op string `json:"op,omitempty"` + Lock string `json:"fileID,omitempty"` + Owner string `json:"userID,omitempty"` + Addr string `json:"addr,omitempty"` +} + +// A RaftStore encapsulates the http server (listener), +// a raft node (raftDir, raftAddr, RaftServer) and a +// lock service (SimpleLockService) +type RaftStore struct { + httpAddr string + ls SimpleLockService + raftDir string + raftAddr string + RaftServer *raft.Raft + // we need a listener here. Node? + logger *log.Logger +} + +// NewRaftServer returns a RaftStore. +func NewRaftServer(raftDir, raftAddr string, enableSingle bool) (*RaftStore, error) { + httpAddr, err := getHTTPAddr(raftAddr) + if err != nil { + return nil, err + } + rs := &RaftStore{ + httpAddr: httpAddr, + ls: *NewSimpleLockService(zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())), + raftDir: raftDir, + raftAddr: raftAddr, + logger: log.New(os.Stderr, fmt.Sprintf("[raftStore | %s]", raftAddr), log.LstdFlags), + } + + // what file access controll is this? check. + if err := os.MkdirAll(raftDir, 0700); err != nil { + return nil, err + } + + config := raft.DefaultConfig() + transport, err := setupRaftCommunication(rs.raftAddr) + if err != nil { + return nil, err + } + + snapshots, err := raft.NewFileSnapshotStore(rs.raftDir, retainSnapshotCount, os.Stderr) + if err != nil { + return nil, fmt.Errorf("file snapshot store: %s", err) + } + + boltDB, err := raftboltdb.NewBoltStore(filepath.Join(rs.raftDir, "raft.db")) + if err != nil { + return nil, fmt.Errorf("new bolt store: %s", err) + } + logStore := boltDB + stableStore := boltDB + + rft, err := raft.NewRaft(config, (*fsm)(rs), logStore, stableStore, snapshots, transport) + + rs.RaftServer = rft + return rs, nil +} + +func getHTTPAddr(raftAddr string) (string, error) { + addrParts := strings.Split(raftAddr, ":") + httpHost := addrParts[0] + port, err := strconv.Atoi(addrParts[1]) + if err != nil { + return "", err + } + return fmt.Sprintf("%s:%d", httpHost, port+1), nil + +} + +func setupRaftCommunication(raftAddr string) (*raft.NetworkTransport, error) { + addr, err := net.ResolveTCPAddr("tcp", raftAddr) + if err != nil { + return nil, err + } + + // What does the maxPool argument signify? + transport, err := raft.NewTCPTransport(raftAddr, addr, 3, 10*time.Second, os.Stderr) + + if err != nil { + return nil, err + } + + return transport, nil +} + +// Acquire locks a File with ID fileID and sets its owner to userID. +// No other user is allowed access to a file once it is locked apart +// from its owner +func (rs *RaftStore) Acquire(fileID, userID string) error { + b, err := json.Marshal(map[string]string{"fileID": fileID, "userID": userID}) + if err != nil { + return err + } + + httpAddr, err := getHTTPAddr(string(rs.RaftServer.Leader())) + + if err != nil { + return err + } + + resp, err := http.Post( + fmt.Sprintf("http://%s/%s/fileID", httpAddr, rs.raftDir), + "application-type/json", + bytes.NewReader(b), + ) + if err != nil { + return err + } + + defer resp.Body.Close() + + return nil + + // desc := NewSimpleDescriptor(fileID, userID) + + // err := rs.ls.Acquire(desc) + // if err != nil { + // return err + // } + // return nil +} + +// Release calls the lockservice function Release(). +// This in turn checks if userID is the owner of fileID +// and if it is, fileID is no longer locked. +// However, if userID does not own fileID, then the lock +// is not released. +func (rs *RaftStore) Release(fileID, userID string) error { + b, err := json.Marshal(map[string]string{"fileID": fileID, "userID": userID}) + if err != nil { + return err + } + + httpAddr, err := getHTTPAddr(string(rs.RaftServer.Leader())) + + if err != nil { + return err + } + + resp, err := http.Post( + fmt.Sprintf("http://%s/%s/fileID/%s", httpAddr, rs.raftDir, fileID), + "application-type/json", + bytes.NewReader(b), + ) + if err != nil { + return err + } + defer resp.Body.Close() + + return nil + + // desc := NewSimpleDescriptor(fileID, userID) + + // err := rs.ls.Release(desc) + // if err != nil { + // return err + // } + // return nil +} From be06c3bc1954108ae410664669ebee8fbb4ca816 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 10 Aug 2020 15:52:06 +0530 Subject: [PATCH 02/10] finished the implementation, null point exception bug --- internal/lockclient/simple_client_test.go | 280 ++++++++++++---------- internal/lockclient/test/raft.db | Bin 0 -> 32768 bytes internal/lockservice/fsm.go | 5 +- internal/lockservice/listener.go | 37 +++ internal/lockservice/raft.go | 87 ++++--- internal/lockservice/raft_test.go | 20 ++ internal/lockservice/routing.go | 90 +++++++ internal/lockservice/test/raft.db | Bin 0 -> 32768 bytes internal/node/simple_node.go | 12 + 9 files changed, 357 insertions(+), 174 deletions(-) create mode 100644 internal/lockclient/test/raft.db create mode 100644 internal/lockservice/listener.go create mode 100644 internal/lockservice/raft_test.go create mode 100644 internal/lockservice/routing.go create mode 100644 internal/lockservice/test/raft.db diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index 4650ae9..238061d 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -1,138 +1,162 @@ package lockclient import ( - "os" + "fmt" "testing" - "time" "github.com/SystemBuilders/LocKey/internal/cache" "github.com/SystemBuilders/LocKey/internal/lockservice" - "github.com/SystemBuilders/LocKey/internal/node" - - "github.com/rs/zerolog" ) -func TestAcquireandRelease(t *testing.T) { - zerolog.New(os.Stdout).With() - - log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) - scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") - ls := lockservice.NewSimpleLockService(log) - - quit := make(chan bool, 1) - go func() { - node.Start(ls, *scfg) - for { - select { - case <-quit: - return - default: - } - } - }() - - // Server takes some time to start - time.Sleep(100 * time.Millisecond) - t.Run("acquire test release test", func(t *testing.T) { - size := 5 - cache := cache.NewLRUCache(size) - sc := NewSimpleClient(*scfg, *cache) - - d := lockservice.NewSimpleDescriptor("test", "owner") - - got := sc.Acquire(d) - var want error - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test1", "owner") - - got = sc.Acquire(d) - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test", "owner") - - got = sc.Release(d) - - if got != want { - t.Errorf("release: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test1", "owner") - - got = sc.Release(d) - - if got != want { - t.Errorf("release: got %q want %q", got, want) - } - }) - - t.Run("acquire test, acquire test, release test", func(t *testing.T) { - size := 5 - cache := cache.NewLRUCache(size) - sc := NewSimpleClient(*scfg, *cache) - - d := lockservice.NewSimpleDescriptor("test", "owner") - - got := sc.Acquire(d) - var want error - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } - - got = sc.Acquire(d) - want = lockservice.ErrFileAcquired - if got.Error() != want.Error() { - t.Errorf("acquire: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test", "owner") - - got = sc.Release(d) - want = nil - if got != want { - t.Errorf("release: got %q want %q", got, want) - } - }) - - t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { - size := 1 - cache := cache.NewLRUCache(size) - sc := NewSimpleClient(*scfg, *cache) - - d := lockservice.NewSimpleDescriptor("test", "owner1") - got := sc.Acquire(d) - var want error - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test", "owner2") - got = sc.Release(d) - want = lockservice.ErrUnauthorizedAccess - if got != want { - t.Errorf("acquire: got %v want %v", got, want) - } - - d = lockservice.NewSimpleDescriptor("test2", "owner1") - got = sc.Acquire(d) - want = nil - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test", "owner1") - - got = sc.Release(d) - want = nil - - if got != want { - t.Errorf("release: got %q want %q", got, want) - } - }) - quit <- true - return +func TestRaft(t *testing.T) { + raftLS, err := lockservice.NewRaftServer( + "test", + "127.0.0.1:5000", + ) + if err != nil { + t.Errorf("%s", err) + } + fmt.Printf("%v", raftLS) + + raftLS.Start() + + scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "5001") + + size := 5 + cache := cache.NewLRUCache(size) + sc := NewSimpleClient(*scfg, *cache) + + d := lockservice.NewSimpleDescriptor("test", "owner") + + got := sc.Acquire(d) + var want error + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + } + +// func TestAcquireandRelease(t *testing.T) { +// zerolog.New(os.Stdout).With() + +// log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) +// scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") +// ls := lockservice.NewSimpleLockService(log) + +// quit := make(chan bool, 1) +// go func() { +// node.Start(ls, *scfg) +// for { +// select { +// case <-quit: +// return +// default: +// } +// } +// }() + +// // Server takes some time to start +// time.Sleep(100 * time.Millisecond) +// t.Run("acquire test release test", func(t *testing.T) { +// size := 5 +// cache := cache.NewLRUCache(size) +// sc := NewSimpleClient(*scfg, *cache) + +// d := lockservice.NewSimpleDescriptor("test", "owner") + +// got := sc.Acquire(d) +// var want error +// if got != want { +// t.Errorf("acquire: got %q want %q", got, want) +// } + +// d = lockservice.NewSimpleDescriptor("test1", "owner") + +// got = sc.Acquire(d) +// if got != want { +// t.Errorf("acquire: got %q want %q", got, want) +// } + +// d = lockservice.NewSimpleDescriptor("test", "owner") + +// got = sc.Release(d) + +// if got != want { +// t.Errorf("release: got %q want %q", got, want) +// } + +// d = lockservice.NewSimpleDescriptor("test1", "owner") + +// got = sc.Release(d) + +// if got != want { +// t.Errorf("release: got %q want %q", got, want) +// } +// }) + +// t.Run("acquire test, acquire test, release test", func(t *testing.T) { +// size := 5 +// cache := cache.NewLRUCache(size) +// sc := NewSimpleClient(*scfg, *cache) + +// d := lockservice.NewSimpleDescriptor("test", "owner") + +// got := sc.Acquire(d) +// var want error +// if got != want { +// t.Errorf("acquire: got %q want %q", got, want) +// } + +// got = sc.Acquire(d) +// want = lockservice.ErrFileAcquired +// if got.Error() != want.Error() { +// t.Errorf("acquire: got %q want %q", got, want) +// } + +// d = lockservice.NewSimpleDescriptor("test", "owner") + +// got = sc.Release(d) +// want = nil +// if got != want { +// t.Errorf("release: got %q want %q", got, want) +// } +// }) + +// t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { +// size := 1 +// cache := cache.NewLRUCache(size) +// sc := NewSimpleClient(*scfg, *cache) + +// d := lockservice.NewSimpleDescriptor("test", "owner1") +// got := sc.Acquire(d) +// var want error +// if got != want { +// t.Errorf("acquire: got %q want %q", got, want) +// } + +// d = lockservice.NewSimpleDescriptor("test", "owner2") +// got = sc.Release(d) +// want = lockservice.ErrUnauthorizedAccess +// if got != want { +// t.Errorf("acquire: got %v want %v", got, want) +// } + +// d = lockservice.NewSimpleDescriptor("test2", "owner1") +// got = sc.Acquire(d) +// want = nil +// if got != want { +// t.Errorf("acquire: got %q want %q", got, want) +// } + +// d = lockservice.NewSimpleDescriptor("test", "owner1") + +// got = sc.Release(d) +// want = nil + +// if got != want { +// t.Errorf("release: got %q want %q", got, want) +// } +// }) +// quit <- true +// return +// } diff --git a/internal/lockclient/test/raft.db b/internal/lockclient/test/raft.db new file mode 100644 index 0000000000000000000000000000000000000000..4533b502baaa01218f0cda9c4d000abc4121910b GIT binary patch literal 32768 zcmeI(J!%3$6ae7WXsQ%~_Q?h8atv<~&?f#s>;%u?4RVvSt_Jj$i{|KHd^Iw$h~ZaIIu9;{}cUyIv$GfzT*009C72oNAZfB*pk z1PFvc;9W%{y+Z@@RIi2o@1`RrakI!)5$PZPi`@G@ID=Voa~SqqtChH z+6fRKK!5-N0t5&UAV7csf$IyzJpb|hf2<3*{vBPB009C72oNAZfB*pk1PBlyK!5-N m0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBoLufQ*tKNAE1 literal 0 HcmV?d00001 diff --git a/internal/lockservice/fsm.go b/internal/lockservice/fsm.go index 25f4418..4a231ce 100644 --- a/internal/lockservice/fsm.go +++ b/internal/lockservice/fsm.go @@ -16,16 +16,17 @@ type fsmSnapshot struct { func (f *fsm) Apply(l *raft.Log) interface{} { var c command + fmt.Println("reached FSM Apply") if err := json.Unmarshal(l.Data, &c); err != nil { panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) } switch c.Op { case "acquire": - return f.applyAcquire(c.Lock, c.Owner) + return f.applyAcquire(c.Key, c.Value) case "release": - return f.applyRelease(c.Lock, c.Owner) + return f.applyRelease(c.Key, c.Value) default: panic(fmt.Sprintf("unrecognized command op: %s", c.Op)) diff --git a/internal/lockservice/listener.go b/internal/lockservice/listener.go new file mode 100644 index 0000000..4a0aa6d --- /dev/null +++ b/internal/lockservice/listener.go @@ -0,0 +1,37 @@ +package lockservice + +import ( + "fmt" + "log" + "net" + "net/http" +) + +func (rs *RaftStore) Start() error { + server := http.Server{ + Handler: rs, + } + + ln, err := net.Listen("tcp", rs.httpAddr) + if err != nil { + return err + } + rs.ln = ln + + http.Handle(fmt.Sprintf("/%s", rs.raftDir), rs) + + fmt.Printf("set up listener at %s\n", rs.httpAddr) + go func() { + err := server.Serve(rs.ln) + if err != nil { + log.Fatalf("HTTP serve: %s", err) + } + }() + + return nil +} + +func (rs *RaftStore) Close() { + rs.ln.Close() + return +} diff --git a/internal/lockservice/raft.go b/internal/lockservice/raft.go index f6089eb..e7e4db7 100644 --- a/internal/lockservice/raft.go +++ b/internal/lockservice/raft.go @@ -25,9 +25,8 @@ const ( type command struct { Op string `json:"op,omitempty"` - Lock string `json:"fileID,omitempty"` - Owner string `json:"userID,omitempty"` - Addr string `json:"addr,omitempty"` + Key string `json:"fileID,omitempty"` + Value string `json:"userID,omitempty"` } // A RaftStore encapsulates the http server (listener), @@ -39,12 +38,12 @@ type RaftStore struct { raftDir string raftAddr string RaftServer *raft.Raft - // we need a listener here. Node? - logger *log.Logger + ln net.Listener + logger *log.Logger } // NewRaftServer returns a RaftStore. -func NewRaftServer(raftDir, raftAddr string, enableSingle bool) (*RaftStore, error) { +func NewRaftServer(raftDir, raftAddr string) (*RaftStore, error) { httpAddr, err := getHTTPAddr(raftAddr) if err != nil { return nil, err @@ -129,7 +128,7 @@ func (rs *RaftStore) Acquire(fileID, userID string) error { } resp, err := http.Post( - fmt.Sprintf("http://%s/%s/fileID", httpAddr, rs.raftDir), + fmt.Sprintf("http://%s/acquire", httpAddr), "application-type/json", bytes.NewReader(b), ) @@ -150,40 +149,40 @@ func (rs *RaftStore) Acquire(fileID, userID string) error { // return nil } -// Release calls the lockservice function Release(). -// This in turn checks if userID is the owner of fileID -// and if it is, fileID is no longer locked. -// However, if userID does not own fileID, then the lock -// is not released. -func (rs *RaftStore) Release(fileID, userID string) error { - b, err := json.Marshal(map[string]string{"fileID": fileID, "userID": userID}) - if err != nil { - return err - } - - httpAddr, err := getHTTPAddr(string(rs.RaftServer.Leader())) - - if err != nil { - return err - } - - resp, err := http.Post( - fmt.Sprintf("http://%s/%s/fileID/%s", httpAddr, rs.raftDir, fileID), - "application-type/json", - bytes.NewReader(b), - ) - if err != nil { - return err - } - defer resp.Body.Close() - - return nil - - // desc := NewSimpleDescriptor(fileID, userID) - - // err := rs.ls.Release(desc) - // if err != nil { - // return err - // } - // return nil -} +// // Release calls the lockservice function Release(). +// // This in turn checks if userID is the owner of fileID +// // and if it is, fileID is no longer locked. +// // However, if userID does not own fileID, then the lock +// // is not released. +// func (rs *RaftStore) Release(fileID, userID string) error { +// b, err := json.Marshal(map[string]string{"fileID": fileID, "userID": userID}) +// if err != nil { +// return err +// } + +// httpAddr, err := getHTTPAddr(string(rs.RaftServer.Leader())) + +// if err != nil { +// return err +// } + +// resp, err := http.Post( +// fmt.Sprintf("http://%s/%s/fileID/%s", httpAddr, rs.raftDir, fileID), +// "application-type/json", +// bytes.NewReader(b), +// ) +// if err != nil { +// return err +// } +// defer resp.Body.Close() + +// return nil + +// // desc := NewSimpleDescriptor(fileID, userID) + +// // err := rs.ls.Release(desc) +// // if err != nil { +// // return err +// // } +// // return nil +// } diff --git a/internal/lockservice/raft_test.go b/internal/lockservice/raft_test.go new file mode 100644 index 0000000..022863c --- /dev/null +++ b/internal/lockservice/raft_test.go @@ -0,0 +1,20 @@ +package lockservice + +import ( + "fmt" + "testing" +) + +func TestRaft(t *testing.T) { + raftLS, err := NewRaftServer( + "test", + "127.0.0.1:5000", + ) + if err != nil { + t.Errorf("%s", err) + } else { + fmt.Println("no error!") + } + fmt.Printf("%v", raftLS) + +} diff --git a/internal/lockservice/routing.go b/internal/lockservice/routing.go new file mode 100644 index 0000000..907a25a --- /dev/null +++ b/internal/lockservice/routing.go @@ -0,0 +1,90 @@ +package lockservice + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" +) + +func (rs *RaftStore) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "/acquire") { + rs.handleAcquire(w, r) + } else if strings.Contains(r.URL.Path, "/release") { + rs.handleRelease(w, r) + } else { + w.WriteHeader(http.StatusNotFound) + } +} + +func (rs *RaftStore) handleAcquire(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req LockRequest + err = json.Unmarshal(body, &req) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + c := &command{ + Op: "acquire", + Key: req.FileID, + Value: req.UserID, + } + fmt.Printf("%v\n", c) + b, err := json.Marshal(c) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + fmt.Printf("REACHED handleAcquire %v\n", req) + + f := rs.RaftServer.Apply(b, raftTimeout) + if f.Error() != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.Write([]byte("lock acquired")) +} + +func (rs *RaftStore) handleRelease(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req LockRequest + err = json.Unmarshal(body, &req) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + c := &command{ + Op: "release", + Key: req.FileID, + Value: req.UserID, + } + b, err := json.Marshal(c) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + f := rs.RaftServer.Apply(b, raftTimeout) + if f.Error() != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.Write([]byte("lock released")) +} diff --git a/internal/lockservice/test/raft.db b/internal/lockservice/test/raft.db new file mode 100644 index 0000000000000000000000000000000000000000..280e025bb8d029da6f21e4784130859ab41caf40 GIT binary patch literal 32768 zcmeI)yKcfT6adgbDHBqeVd>O~4fqEJmVQGeSa=Mb;8XdBbnk`@z5@t`N|Bo3M2h^# zjbokd+E%LSdi6?Kp9Xa|*N3ic5 zoa66S*A;+#p8q}Fxj&b=Jxh1AzxKOTYCgPT4Pmoie?JUJoe}fwxqbo!2oNAZfB*pk z1PBlyK%jYn_@CdfWag2d$GL#!zi31P1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 d2oNAZfB*pk1PBlyK!5-N0t5&UAVA=+0%vm(519Y} literal 0 HcmV?d00001 diff --git a/internal/node/simple_node.go b/internal/node/simple_node.go index 78dfe79..ef7a4ce 100644 --- a/internal/node/simple_node.go +++ b/internal/node/simple_node.go @@ -17,6 +17,18 @@ import ( "github.com/gorilla/mux" ) +// func (rs *lockservice.RaftStore) Start() error { +// ln, err := net.Listen("tcp", rs.httpAddr) +// if err != nil { +// return err +// } +// rs.ln = ln +// server := http.Server{ +// Handler: rs, +// } + +// } + // Start begins the node's operation as a http server. func Start(ls *lockservice.SimpleLockService, scfg lockservice.SimpleConfig) error { From 135d4818f1d5c8665df1f29693077e8d584f535d Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 11 Aug 2020 15:42:35 +0530 Subject: [PATCH 03/10] got a working workflow --- internal/lockclient/simple_client_test.go | 298 +++++++++++----------- internal/lockclient/test/raft.db | Bin 32768 -> 32768 bytes internal/lockservice/fsm.go | 2 +- internal/lockservice/listener.go | 2 +- internal/lockservice/raft.go | 228 +++++++++++------ internal/lockservice/routing.go | 3 - 6 files changed, 306 insertions(+), 227 deletions(-) diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index 238061d..5da7711 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -1,162 +1,170 @@ package lockclient import ( - "fmt" "testing" + "time" "github.com/SystemBuilders/LocKey/internal/cache" "github.com/SystemBuilders/LocKey/internal/lockservice" ) -func TestRaft(t *testing.T) { - raftLS, err := lockservice.NewRaftServer( - "test", - "127.0.0.1:5000", - ) - if err != nil { - t.Errorf("%s", err) - } - fmt.Printf("%v", raftLS) +// func TestRaft(t *testing.T) { +// raftLS := lockservice.New(true) +// // raftLS, err := lockservice.NewRaftServer( +// // "test", +// // "127.0.0.1:5000", +// // ) +// raftLS.RaftAddr = "127.0.0.1:5000" +// raftLS.Open(true, "node0") +// raftLS.Start() - raftLS.Start() +// scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "5001") - scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "5001") +// size := 5 +// cache := cache.NewLRUCache(size) +// sc := NewSimpleClient(*scfg, *cache) - size := 5 - cache := cache.NewLRUCache(size) - sc := NewSimpleClient(*scfg, *cache) +// d := lockservice.NewSimpleDescriptor("test", "owner") - d := lockservice.NewSimpleDescriptor("test", "owner") +// time.Sleep(3 * time.Second) - got := sc.Acquire(d) - var want error - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } +// got := sc.Acquire(d) +// var want error +// if got != want { +// t.Errorf("acquire: got %q want %q", got, want) +// } -} +// got = sc.Release(d) -// func TestAcquireandRelease(t *testing.T) { -// zerolog.New(os.Stdout).With() - -// log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) -// scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") -// ls := lockservice.NewSimpleLockService(log) - -// quit := make(chan bool, 1) -// go func() { -// node.Start(ls, *scfg) -// for { -// select { -// case <-quit: -// return -// default: -// } -// } -// }() - -// // Server takes some time to start -// time.Sleep(100 * time.Millisecond) -// t.Run("acquire test release test", func(t *testing.T) { -// size := 5 -// cache := cache.NewLRUCache(size) -// sc := NewSimpleClient(*scfg, *cache) - -// d := lockservice.NewSimpleDescriptor("test", "owner") - -// got := sc.Acquire(d) -// var want error -// if got != want { -// t.Errorf("acquire: got %q want %q", got, want) -// } - -// d = lockservice.NewSimpleDescriptor("test1", "owner") - -// got = sc.Acquire(d) -// if got != want { -// t.Errorf("acquire: got %q want %q", got, want) -// } - -// d = lockservice.NewSimpleDescriptor("test", "owner") - -// got = sc.Release(d) - -// if got != want { -// t.Errorf("release: got %q want %q", got, want) -// } - -// d = lockservice.NewSimpleDescriptor("test1", "owner") - -// got = sc.Release(d) - -// if got != want { -// t.Errorf("release: got %q want %q", got, want) -// } -// }) - -// t.Run("acquire test, acquire test, release test", func(t *testing.T) { -// size := 5 -// cache := cache.NewLRUCache(size) -// sc := NewSimpleClient(*scfg, *cache) - -// d := lockservice.NewSimpleDescriptor("test", "owner") - -// got := sc.Acquire(d) -// var want error -// if got != want { -// t.Errorf("acquire: got %q want %q", got, want) -// } - -// got = sc.Acquire(d) -// want = lockservice.ErrFileAcquired -// if got.Error() != want.Error() { -// t.Errorf("acquire: got %q want %q", got, want) -// } - -// d = lockservice.NewSimpleDescriptor("test", "owner") - -// got = sc.Release(d) -// want = nil -// if got != want { -// t.Errorf("release: got %q want %q", got, want) -// } -// }) - -// t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { -// size := 1 -// cache := cache.NewLRUCache(size) -// sc := NewSimpleClient(*scfg, *cache) - -// d := lockservice.NewSimpleDescriptor("test", "owner1") -// got := sc.Acquire(d) -// var want error -// if got != want { -// t.Errorf("acquire: got %q want %q", got, want) -// } - -// d = lockservice.NewSimpleDescriptor("test", "owner2") -// got = sc.Release(d) -// want = lockservice.ErrUnauthorizedAccess -// if got != want { -// t.Errorf("acquire: got %v want %v", got, want) -// } - -// d = lockservice.NewSimpleDescriptor("test2", "owner1") -// got = sc.Acquire(d) -// want = nil -// if got != want { -// t.Errorf("acquire: got %q want %q", got, want) -// } - -// d = lockservice.NewSimpleDescriptor("test", "owner1") - -// got = sc.Release(d) -// want = nil - -// if got != want { -// t.Errorf("release: got %q want %q", got, want) -// } -// }) -// quit <- true -// return // } + +func TestAcquireandRelease(t *testing.T) { + // zerolog.New(os.Stdout).With() + + // log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) + // scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") + // ls := lockservice.NewSimpleLockService(log) + scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "5001") + + quit := make(chan bool, 1) + go func() { + raftLS := lockservice.New(true) + + raftLS.RaftAddr = "127.0.0.1:5000" + raftLS.Open(true, "node0") + raftLS.Start() + + for { + select { + case <-quit: + return + default: + } + } + }() + + // Server takes some time to start + time.Sleep(3 * time.Second) + t.Run("acquire test release test", func(t *testing.T) { + size := 5 + cache := cache.NewLRUCache(size) + sc := NewSimpleClient(*scfg, *cache) + + d := lockservice.NewSimpleDescriptor("test", "owner") + + got := sc.Acquire(d) + var want error + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test1", "owner") + + got = sc.Acquire(d) + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test", "owner") + + got = sc.Release(d) + + if got != want { + t.Errorf("release: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test1", "owner") + + got = sc.Release(d) + + if got != want { + t.Errorf("release: got %q want %q", got, want) + } + }) + + t.Run("acquire test, acquire test, release test", func(t *testing.T) { + size := 5 + cache := cache.NewLRUCache(size) + sc := NewSimpleClient(*scfg, *cache) + + d := lockservice.NewSimpleDescriptor("test", "owner") + + got := sc.Acquire(d) + var want error + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + got = sc.Acquire(d) + want = lockservice.ErrFileAcquired + if got.Error() != want.Error() { + t.Errorf("acquire: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test", "owner") + + got = sc.Release(d) + want = nil + if got != want { + t.Errorf("release: got %q want %q", got, want) + } + }) + + // t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { + // size := 1 + // cache := cache.NewLRUCache(size) + // sc := NewSimpleClient(*scfg, *cache) + + // d := lockservice.NewSimpleDescriptor("test", "owner1") + // got := sc.Acquire(d) + // var want error + // if got != want { + // t.Errorf("acquire: got %q want %q", got, want) + // } + + // d = lockservice.NewSimpleDescriptor("test", "owner2") + // got = sc.Release(d) + // want = lockservice.ErrUnauthorizedAccess + // if got != want { + // t.Errorf("acquire: got %v want %v", got, want) + // } + + // d = lockservice.NewSimpleDescriptor("test2", "owner1") + // got = sc.Acquire(d) + // want = nil + // if got != want { + // t.Errorf("acquire: got %q want %q", got, want) + // } + + // d = lockservice.NewSimpleDescriptor("test", "owner1") + + // got = sc.Release(d) + // want = nil + + // if got != want { + // t.Errorf("release: got %q want %q", got, want) + // } + // }) + quit <- true + return +} diff --git a/internal/lockclient/test/raft.db b/internal/lockclient/test/raft.db index 4533b502baaa01218f0cda9c4d000abc4121910b..dae39e487021c6f28f240af2be9bbf603066aec7 100644 GIT binary patch delta 58 zcmZo@U}|V!n&2Ry#sC34$s7NyZ~v~aSy5pF|0Dqi0d=T^53|ntN101Bz!HlT5&+Qh B5LN&H delta 58 zcmZo@U}|V!n&2QH!T Date: Wed, 12 Aug 2020 14:18:49 +0530 Subject: [PATCH 04/10] fixed all errors, basic workflow done. Join node is pending --- internal/lockclient/simple_client_test.go | 70 +++++++++++------------ internal/lockservice/fsm.go | 1 - internal/lockservice/raft_test.go | 20 ------- internal/lockservice/routing.go | 16 ++++++ internal/lockservice/simpleLockService.go | 55 ++++++++++++++++++ 5 files changed, 106 insertions(+), 56 deletions(-) delete mode 100644 internal/lockservice/raft_test.go diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index 5da7711..d9f406b 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -130,41 +130,41 @@ func TestAcquireandRelease(t *testing.T) { } }) - // t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { - // size := 1 - // cache := cache.NewLRUCache(size) - // sc := NewSimpleClient(*scfg, *cache) - - // d := lockservice.NewSimpleDescriptor("test", "owner1") - // got := sc.Acquire(d) - // var want error - // if got != want { - // t.Errorf("acquire: got %q want %q", got, want) - // } - - // d = lockservice.NewSimpleDescriptor("test", "owner2") - // got = sc.Release(d) - // want = lockservice.ErrUnauthorizedAccess - // if got != want { - // t.Errorf("acquire: got %v want %v", got, want) - // } - - // d = lockservice.NewSimpleDescriptor("test2", "owner1") - // got = sc.Acquire(d) - // want = nil - // if got != want { - // t.Errorf("acquire: got %q want %q", got, want) - // } - - // d = lockservice.NewSimpleDescriptor("test", "owner1") - - // got = sc.Release(d) - // want = nil - - // if got != want { - // t.Errorf("release: got %q want %q", got, want) - // } - // }) + t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { + size := 1 + cache := cache.NewLRUCache(size) + sc := NewSimpleClient(*scfg, *cache) + + d := lockservice.NewSimpleDescriptor("test", "owner1") + got := sc.Acquire(d) + var want error + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test", "owner2") + got = sc.Release(d) + want = lockservice.ErrUnauthorizedAccess + if got != want { + t.Errorf("release: got %v want %v", got, want) + } + + d = lockservice.NewSimpleDescriptor("test2", "owner1") + got = sc.Acquire(d) + want = nil + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test", "owner1") + + got = sc.Release(d) + want = nil + + if got != want { + t.Errorf("release: got %q want %q", got, want) + } + }) quit <- true return } diff --git a/internal/lockservice/fsm.go b/internal/lockservice/fsm.go index f7a5822..cfd30ae 100644 --- a/internal/lockservice/fsm.go +++ b/internal/lockservice/fsm.go @@ -48,7 +48,6 @@ func (f *fsm) applyRelease(lock, owner string) interface{} { err := f.ls.Release(desc) if err != nil { - fmt.Printf("%s\n", err) return err } return nil diff --git a/internal/lockservice/raft_test.go b/internal/lockservice/raft_test.go deleted file mode 100644 index 022863c..0000000 --- a/internal/lockservice/raft_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package lockservice - -import ( - "fmt" - "testing" -) - -func TestRaft(t *testing.T) { - raftLS, err := NewRaftServer( - "test", - "127.0.0.1:5000", - ) - if err != nil { - t.Errorf("%s", err) - } else { - fmt.Println("no error!") - } - fmt.Printf("%v", raftLS) - -} diff --git a/internal/lockservice/routing.go b/internal/lockservice/routing.go index 2bbfe56..0ff4e30 100644 --- a/internal/lockservice/routing.go +++ b/internal/lockservice/routing.go @@ -43,6 +43,14 @@ func (rs *RaftStore) handleAcquire(w http.ResponseWriter, r *http.Request) { return } + // Check if acquire is possible + desc := NewSimpleDescriptor(req.FileID, req.UserID) + err = rs.ls.TryAcquire(desc) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + // If possible, commit the change f := rs.RaftServer.Apply(b, raftTimeout) if f.Error() != nil { w.WriteHeader(http.StatusBadRequest) @@ -77,6 +85,14 @@ func (rs *RaftStore) handleRelease(w http.ResponseWriter, r *http.Request) { return } + // Check if release is possible + desc := NewSimpleDescriptor(req.FileID, req.UserID) + err = rs.ls.TryRelease(desc) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + // If possible, commit the change f := rs.RaftServer.Apply(b, raftTimeout) if f.Error() != nil { w.WriteHeader(http.StatusBadRequest) diff --git a/internal/lockservice/simpleLockService.go b/internal/lockservice/simpleLockService.go index 42e4fad..4dff97d 100644 --- a/internal/lockservice/simpleLockService.go +++ b/internal/lockservice/simpleLockService.go @@ -18,11 +18,14 @@ type SimpleConfig struct { PortAddr string } +// LockRequest is a struct used by the client to +// communicate to the listener HTTP server type LockRequest struct { FileID string `json:"FileID"` UserID string `json:"UserID"` } +// IP returns the IP address from SimpleConfig func (scfg *SimpleConfig) IP() string { return scfg.IPAddr } @@ -63,6 +66,7 @@ func (sd *SimpleDescriptor) Owner() string { return sd.UserID } +// NewSimpleConfig returns a new simple configuration func NewSimpleConfig(IPAddr, PortAddr string) *SimpleConfig { return &SimpleConfig{ IPAddr: IPAddr, @@ -70,6 +74,7 @@ func NewSimpleConfig(IPAddr, PortAddr string) *SimpleConfig { } } +// NewSimpleDescriptor returns a new simple descriptor func NewSimpleDescriptor(FileID, UserID string) *SimpleDescriptor { return &SimpleDescriptor{ FileID: FileID, @@ -110,6 +115,25 @@ func (ls *SimpleLockService) Acquire(sd Descriptors) error { return nil } +// TryAcquire checks if it is possible to lock a file. +// It is used as a check before a acquire() action is +// actually committedin the log of the distributed +// consensus algorithm. +func (ls *SimpleLockService) TryAcquire(sd Descriptors) error { + ls.lockMap.Mutex.Lock() + if _, ok := ls.lockMap.LockMap[sd.ID()]; ok { + ls.lockMap.Mutex.Unlock() + ls. + log. + Debug(). + Str("descriptor", sd.ID()). + Msg("can't acquire, already been acquired") + return ErrFileAcquired + } + ls.lockMap.Mutex.Unlock() + return nil +} + // Release lets a client to release a lock on an object. func (ls *SimpleLockService) Release(sd Descriptors) error { ls.lockMap.Mutex.Lock() @@ -146,6 +170,37 @@ func (ls *SimpleLockService) Release(sd Descriptors) error { } +// TryRelease checks if it is possible to release a file. +// It is used as a check before a release() action is +// actually committedin the log of the distributed +// consensus algorithm. +func (ls *SimpleLockService) TryRelease(sd Descriptors) error { + ls.lockMap.Mutex.Lock() + if ls.lockMap.LockMap[sd.ID()] == sd.Owner() { + ls.lockMap.Mutex.Unlock() + return nil + } else if _, ok := ls.lockMap.LockMap[sd.ID()]; !ok { + ls. + log. + Debug(). + Str("descriptor", sd.ID()). + Msg("can't release, hasn't been acquired") + ls.lockMap.Mutex.Unlock() + return ErrCantReleaseFile + + } else { + ls. + log. + Debug(). + Str("descriptor", sd.ID()). + Msg("can't release, unauthorized access") + ls.lockMap.Mutex.Unlock() + return ErrUnauthorizedAccess + + } + +} + // CheckAcquired returns true if the file is acquired func (ls *SimpleLockService) CheckAcquired(sd Descriptors) bool { ls.lockMap.Mutex.Lock() From 1fb89a973b25acefc7379dc6b88c8ed9f73ac755 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 13 Aug 2020 23:47:51 +0530 Subject: [PATCH 05/10] added join node funtionality, redirecting requests to leader --- internal/lockclient/simple_client.go | 7 +- internal/lockclient/simple_client_test.go | 168 +++++++++++++--------- internal/lockservice/fsm.go | 2 + internal/lockservice/listener.go | 9 +- internal/lockservice/raft.go | 34 +++++ internal/lockservice/routing.go | 77 ++++++++++ internal/lockservice/simpleLockService.go | 2 + 7 files changed, 224 insertions(+), 75 deletions(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 6013bfb..b824c9e 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io/ioutil" "net/http" "strings" @@ -42,9 +43,10 @@ func (sc *SimpleClient) Acquire(d lockservice.Descriptors) error { } testData := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - requestJson, err := json.Marshal(testData) + fmt.Println(testData) + requestJSON, err := json.Marshal(testData) - req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJson)) + req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) if err != nil { return err } @@ -59,6 +61,7 @@ func (sc *SimpleClient) Acquire(d lockservice.Descriptors) error { defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) + fmt.Println(body) if resp.StatusCode != 200 { return errors.New(strings.TrimSpace(string(body))) diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index d9f406b..7e6eb1f 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -1,6 +1,7 @@ package lockclient import ( + "fmt" "testing" "time" @@ -45,6 +46,7 @@ func TestAcquireandRelease(t *testing.T) { // scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") // ls := lockservice.NewSimpleLockService(log) scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "5001") + scfg1 := lockservice.NewSimpleConfig("http://127.0.0.1", "7001") quit := make(chan bool, 1) go func() { @@ -54,6 +56,31 @@ func TestAcquireandRelease(t *testing.T) { raftLS.Open(true, "node0") raftLS.Start() + time.Sleep(3 * time.Second) + raftLS2 := lockservice.New(true) + + raftLS2.RaftAddr = "127.0.0.1:6000" + raftLS2.Open(true, "node1") + raftLS2.Start() + + fmt.Printf("joining") + raftLS.Join("127.0.0.1:6000") + time.Sleep(5 * time.Second) + + time.Sleep(3 * time.Second) + + raftLS3 := lockservice.New(true) + + raftLS3.RaftAddr = "127.0.0.1:7000" + raftLS3.Open(true, "node1") + raftLS3.Start() + + time.Sleep(3 * time.Second) + + fmt.Printf("joining") + raftLS.Join("127.0.0.1:7000") + time.Sleep(5 * time.Second) + for { select { case <-quit: @@ -64,15 +91,17 @@ func TestAcquireandRelease(t *testing.T) { }() // Server takes some time to start - time.Sleep(3 * time.Second) + time.Sleep(30 * time.Second) t.Run("acquire test release test", func(t *testing.T) { size := 5 cache := cache.NewLRUCache(size) sc := NewSimpleClient(*scfg, *cache) + sc1 := NewSimpleClient(*scfg1, *cache) d := lockservice.NewSimpleDescriptor("test", "owner") got := sc.Acquire(d) + time.Sleep(5 * time.Second) var want error if got != want { t.Errorf("acquire: got %q want %q", got, want) @@ -81,90 +110,91 @@ func TestAcquireandRelease(t *testing.T) { d = lockservice.NewSimpleDescriptor("test1", "owner") got = sc.Acquire(d) + time.Sleep(5 * time.Second) if got != want { t.Errorf("acquire: got %q want %q", got, want) } d = lockservice.NewSimpleDescriptor("test", "owner") - got = sc.Release(d) - + got = sc1.Release(d) + time.Sleep(5 * time.Second) if got != want { t.Errorf("release: got %q want %q", got, want) } d = lockservice.NewSimpleDescriptor("test1", "owner") - got = sc.Release(d) - - if got != want { - t.Errorf("release: got %q want %q", got, want) - } - }) - - t.Run("acquire test, acquire test, release test", func(t *testing.T) { - size := 5 - cache := cache.NewLRUCache(size) - sc := NewSimpleClient(*scfg, *cache) - - d := lockservice.NewSimpleDescriptor("test", "owner") - - got := sc.Acquire(d) - var want error - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } - - got = sc.Acquire(d) - want = lockservice.ErrFileAcquired - if got.Error() != want.Error() { - t.Errorf("acquire: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test", "owner") - - got = sc.Release(d) - want = nil + got = sc1.Release(d) + time.Sleep(5 * time.Second) if got != want { t.Errorf("release: got %q want %q", got, want) } }) - t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { - size := 1 - cache := cache.NewLRUCache(size) - sc := NewSimpleClient(*scfg, *cache) - - d := lockservice.NewSimpleDescriptor("test", "owner1") - got := sc.Acquire(d) - var want error - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test", "owner2") - got = sc.Release(d) - want = lockservice.ErrUnauthorizedAccess - if got != want { - t.Errorf("release: got %v want %v", got, want) - } - - d = lockservice.NewSimpleDescriptor("test2", "owner1") - got = sc.Acquire(d) - want = nil - if got != want { - t.Errorf("acquire: got %q want %q", got, want) - } - - d = lockservice.NewSimpleDescriptor("test", "owner1") - - got = sc.Release(d) - want = nil - - if got != want { - t.Errorf("release: got %q want %q", got, want) - } - }) - quit <- true + // t.Run("acquire test, acquire test, release test", func(t *testing.T) { + // size := 5 + // cache := cache.NewLRUCache(size) + // sc := NewSimpleClient(*scfg, *cache) + + // d := lockservice.NewSimpleDescriptor("test", "owner") + + // got := sc.Acquire(d) + // var want error + // if got != want { + // t.Errorf("acquire: got %q want %q", got, want) + // } + + // got = sc.Acquire(d) + // want = lockservice.ErrFileAcquired + // if got.Error() != want.Error() { + // t.Errorf("acquire: got %q want %q", got, want) + // } + + // d = lockservice.NewSimpleDescriptor("test", "owner") + + // got = sc.Release(d) + // want = nil + // if got != want { + // t.Errorf("release: got %q want %q", got, want) + // } + // }) + + // t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { + // size := 1 + // cache := cache.NewLRUCache(size) + // sc := NewSimpleClient(*scfg, *cache) + + // d := lockservice.NewSimpleDescriptor("test", "owner1") + // got := sc.Acquire(d) + // var want error + // if got != want { + // t.Errorf("acquire: got %q want %q", got, want) + // } + + // d = lockservice.NewSimpleDescriptor("test", "owner2") + // got = sc.Release(d) + // want = lockservice.ErrUnauthorizedAccess + // if got != want { + // t.Errorf("release: got %v want %v", got, want) + // } + + // d = lockservice.NewSimpleDescriptor("test2", "owner1") + // got = sc.Acquire(d) + // want = nil + // if got != want { + // t.Errorf("acquire: got %q want %q", got, want) + // } + + // d = lockservice.NewSimpleDescriptor("test", "owner1") + + // got = sc.Release(d) + // want = nil + + // if got != want { + // t.Errorf("release: got %q want %q", got, want) + // } + // }) + // quit <- true return } diff --git a/internal/lockservice/fsm.go b/internal/lockservice/fsm.go index cfd30ae..92f7c83 100644 --- a/internal/lockservice/fsm.go +++ b/internal/lockservice/fsm.go @@ -20,6 +20,7 @@ func (f *fsm) Apply(l *raft.Log) interface{} { panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) } + fmt.Println("reached apply") switch c.Op { case "acquire": return f.applyAcquire(c.Key, c.Value) @@ -34,6 +35,7 @@ func (f *fsm) Apply(l *raft.Log) interface{} { } func (f *fsm) applyAcquire(lock, owner string) interface{} { + fmt.Println("reached applyAcquire") desc := NewSimpleDescriptor(lock, owner) err := f.ls.Acquire(desc) diff --git a/internal/lockservice/listener.go b/internal/lockservice/listener.go index b198d5a..8743af0 100644 --- a/internal/lockservice/listener.go +++ b/internal/lockservice/listener.go @@ -1,12 +1,16 @@ package lockservice import ( - "fmt" "log" "net" "net/http" ) +// Start starts the http service using the listener within a RaftStore. +// The HTTP server is used to redirect commands like Set, Delete and Join +// to the leader RaftStore in a cluster. The HTTP address is always +// one away from the Raft address which the raft node uses for communication +// with other raft nodes. func (rs *RaftStore) Start() error { server := http.Server{ Handler: rs, @@ -18,9 +22,6 @@ func (rs *RaftStore) Start() error { } rs.ln = ln - http.Handle(fmt.Sprintf("/%s", rs.RaftDir), rs) - - fmt.Printf("set up listener at %s\n", rs.httpAddr) go func() { err := server.Serve(rs.ln) if err != nil { diff --git a/internal/lockservice/raft.go b/internal/lockservice/raft.go index b4be1cf..8ef8647 100644 --- a/internal/lockservice/raft.go +++ b/internal/lockservice/raft.go @@ -1,9 +1,12 @@ package lockservice import ( + "bytes" + "encoding/json" "fmt" "log" "net" + "net/http" "os" "path/filepath" "strconv" @@ -116,6 +119,37 @@ func (s *RaftStore) Open(enableSingle bool, localID string) error { return nil } +func (rs *RaftStore) Join(addr string) error { + b, err := json.Marshal(map[string]string{"addr": addr}) + if err != nil { + return err + } + + var postAddr string + if rs.RaftServer.Leader() == "" { + postAddr = rs.RaftAddr + } else { + postAddr = string(rs.RaftServer.Leader()) + } + + httpAddr, err := getHTTPAddr(postAddr) + if err != nil { + return err + } + + resp, err := http.Post( + fmt.Sprintf("http://%s/join", httpAddr), + "application-type/json", + bytes.NewReader(b), + ) + defer resp.Body.Close() + if err != nil { + return err + } + + return nil +} + // // NewRaftServer returns a RaftStore. // func NewRaftServer(raftDir, raftAddr string) (*RaftStore, error) { // httpAddr, err := getHTTPAddr(raftAddr) diff --git a/internal/lockservice/routing.go b/internal/lockservice/routing.go index 0ff4e30..500a85e 100644 --- a/internal/lockservice/routing.go +++ b/internal/lockservice/routing.go @@ -2,16 +2,56 @@ package lockservice import ( "encoding/json" + "fmt" "io/ioutil" "net/http" + "strconv" "strings" + + "github.com/hashicorp/raft" ) func (rs *RaftStore) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if getRaftAddr(rs.httpAddr) != string(rs.RaftServer.Leader()) { + // fmt.Printf("%s %s\n", rs.httpAddr, string(rs.RaftServer.Leader())) + // http.Redirect(w, r, string(rs.RaftServer.Leader()), http.StatusOK) + // return + url := r.URL + url.Host, _ = getHTTPAddr(string(rs.RaftServer.Leader())) + url.Scheme = "http" + + fmt.Println(url.String()) + + proxyReq, err := http.NewRequest(r.Method, url.String(), r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + proxyReq.Header.Set("Host", r.Host) + proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + + for header, values := range r.Header { + for _, value := range values { + proxyReq.Header.Add(header, value) + } + } + + client := &http.Client{} + resp, err := client.Do(proxyReq) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer resp.Body.Close() + return + } if strings.Contains(r.URL.Path, "/acquire") { rs.handleAcquire(w, r) } else if strings.Contains(r.URL.Path, "/release") { rs.handleRelease(w, r) + } else if strings.Contains(r.URL.Path, "/join") { + rs.handleJoin(w, r) } else { w.WriteHeader(http.StatusNotFound) } @@ -19,6 +59,7 @@ func (rs *RaftStore) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (rs *RaftStore) handleAcquire(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) + if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -31,6 +72,7 @@ func (rs *RaftStore) handleAcquire(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } + fmt.Printf("Reahed handleAcquire : %v", req) c := &command{ Op: "acquire", @@ -101,3 +143,38 @@ func (rs *RaftStore) handleRelease(w http.ResponseWriter, r *http.Request) { w.Write([]byte("lock released")) } + +// handleJoin actually applies the join upon receiving the http request. +func (rs *RaftStore) handleJoin(w http.ResponseWriter, r *http.Request) { + fmt.Printf("reached handle join %s\n", rs.httpAddr) + m := map[string]string{} + err := json.NewDecoder(r.Body).Decode(&m) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + remoteAddr := m["addr"] + + f := rs.RaftServer.AddPeer(raft.ServerAddress(remoteAddr)) + if f.Error() != nil { + fmt.Println(f.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + fmt.Printf("no error\n") + + fmt.Printf("node at %s joined successfully", remoteAddr) + rs.logger.Printf("node at %s joined successfully", remoteAddr) +} + +func getRaftAddr(raftAddr string) string { + addrParts := strings.Split(raftAddr, ":") + httpHost := addrParts[0] + port, err := strconv.Atoi(addrParts[1]) + if err != nil { + return "" + } + return fmt.Sprintf("%s:%d", httpHost, port-1) + +} diff --git a/internal/lockservice/simpleLockService.go b/internal/lockservice/simpleLockService.go index 4dff97d..5db73af 100644 --- a/internal/lockservice/simpleLockService.go +++ b/internal/lockservice/simpleLockService.go @@ -1,6 +1,7 @@ package lockservice import ( + "fmt" "sync" "github.com/rs/zerolog" @@ -130,6 +131,7 @@ func (ls *SimpleLockService) TryAcquire(sd Descriptors) error { Msg("can't acquire, already been acquired") return ErrFileAcquired } + fmt.Println("no problem with try acquire\n") ls.lockMap.Mutex.Unlock() return nil } From 70a1adf47101b941bc55757ea723427415ac1811 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 14 Aug 2020 13:23:59 +0530 Subject: [PATCH 06/10] fixed join() to work with protocol version 3 --- internal/lockclient/simple_client_test.go | 8 +- internal/lockservice/raft.go | 160 +++++----------------- internal/lockservice/routing.go | 45 +++++- 3 files changed, 78 insertions(+), 135 deletions(-) diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index 7e6eb1f..d7cd230 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -60,11 +60,11 @@ func TestAcquireandRelease(t *testing.T) { raftLS2 := lockservice.New(true) raftLS2.RaftAddr = "127.0.0.1:6000" - raftLS2.Open(true, "node1") + raftLS2.Open(true, "node2") raftLS2.Start() fmt.Printf("joining") - raftLS.Join("127.0.0.1:6000") + raftLS.Join("127.0.0.1:6000", "node2") time.Sleep(5 * time.Second) time.Sleep(3 * time.Second) @@ -72,13 +72,13 @@ func TestAcquireandRelease(t *testing.T) { raftLS3 := lockservice.New(true) raftLS3.RaftAddr = "127.0.0.1:7000" - raftLS3.Open(true, "node1") + raftLS3.Open(true, "node3") raftLS3.Start() time.Sleep(3 * time.Second) fmt.Printf("joining") - raftLS.Join("127.0.0.1:7000") + raftLS.Join("127.0.0.1:7000", "node3") time.Sleep(5 * time.Second) for { diff --git a/internal/lockservice/raft.go b/internal/lockservice/raft.go index 8ef8647..b8782e2 100644 --- a/internal/lockservice/raft.go +++ b/internal/lockservice/raft.go @@ -59,6 +59,7 @@ func (s *RaftStore) Open(enableSingle bool, localID string) error { // Setup Raft configuration. config := raft.DefaultConfig() config.LocalID = raft.ServerID(localID) + // config.ProtocolVersion = 2 httpAddr, err := getHTTPAddr(s.RaftAddr) if err != nil { @@ -119,8 +120,8 @@ func (s *RaftStore) Open(enableSingle bool, localID string) error { return nil } -func (rs *RaftStore) Join(addr string) error { - b, err := json.Marshal(map[string]string{"addr": addr}) +func (rs *RaftStore) Join(addr, ID string) error { + b, err := json.Marshal(map[string]string{"addr": addr, "id": ID}) if err != nil { return err } @@ -150,47 +151,41 @@ func (rs *RaftStore) Join(addr string) error { return nil } -// // NewRaftServer returns a RaftStore. -// func NewRaftServer(raftDir, raftAddr string) (*RaftStore, error) { -// httpAddr, err := getHTTPAddr(raftAddr) -// if err != nil { -// return nil, err -// } -// rs := &RaftStore{ -// httpAddr: httpAddr, -// ls: NewSimpleLockService(zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())), -// raftDir: raftDir, -// raftAddr: raftAddr, -// logger: log.New(os.Stderr, fmt.Sprintf("[raftStore | %s]", raftAddr), log.LstdFlags), -// } - -// // what file access controll is this? check. -// if err := os.MkdirAll(raftDir, 0700); err != nil { -// return nil, err -// } +// // Join joins a node, identified by nodeID and located at addr, to this store. +// // The node must be ready to respond to Raft communications at that address. +// func (s *RaftStore) Join(nodeID, addr string) error { +// s.logger.Printf("received join request for remote node %s at %s", nodeID, addr) -// config := raft.DefaultConfig() -// transport, err := setupRaftCommunication(rs.raftAddr) -// if err != nil { -// return nil, err +// configFuture := s.RaftServer.GetConfiguration() +// if err := configFuture.Error(); err != nil { +// s.logger.Printf("failed to get raft configuration: %v", err) +// return err // } -// snapshots, err := raft.NewFileSnapshotStore(rs.raftDir, retainSnapshotCount, os.Stderr) -// if err != nil { -// return nil, fmt.Errorf("file snapshot store: %s", err) +// for _, srv := range configFuture.Configuration().Servers { +// // If a node already exists with either the joining node's ID or address, +// // that node may need to be removed from the config first. +// if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) { +// // However if *both* the ID and the address are the same, then nothing -- not even +// // a join operation -- is needed. +// if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) { +// s.logger.Printf("node %s at %s already member of cluster, ignoring join request", nodeID, addr) +// return nil +// } + +// future := s.RaftServer.RemoveServer(srv.ID, 0, 0) +// if err := future.Error(); err != nil { +// return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err) +// } +// } // } -// boltDB, err := raftboltdb.NewBoltStore(filepath.Join(rs.raftDir, "raft.db")) -// if err != nil { -// return nil, fmt.Errorf("new bolt store: %s", err) +// f := s.RaftServer.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0) +// if f.Error() != nil { +// return f.Error() // } -// logStore := boltDB -// stableStore := boltDB - -// rft, err := raft.NewRaft(config, (*fsm)(rs), logStore, stableStore, snapshots, transport) - -// rs.RaftServer = rft -// return rs, nil +// s.logger.Printf("node %s at %s joined successfully", nodeID, addr) +// return nil // } func getHTTPAddr(raftAddr string) (string, error) { @@ -203,94 +198,3 @@ func getHTTPAddr(raftAddr string) (string, error) { return fmt.Sprintf("%s:%d", httpHost, port+1), nil } - -// func setupRaftCommunication(raftAddr string) (*raft.NetworkTransport, error) { -// addr, err := net.ResolveTCPAddr("tcp", raftAddr) -// if err != nil { -// return nil, err -// } - -// // What does the maxPool argument signify? -// transport, err := raft.NewTCPTransport(raftAddr, addr, 3, 10*time.Second, os.Stderr) - -// if err != nil { -// return nil, err -// } - -// return transport, nil -// } - -// // Acquire locks a File with ID fileID and sets its owner to userID. -// // No other user is allowed access to a file once it is locked apart -// // from its owner -// func (rs *RaftStore) Acquire(fileID, userID string) error { -// b, err := json.Marshal(map[string]string{"fileID": fileID, "userID": userID}) -// if err != nil { -// return err -// } - -// httpAddr, err := getHTTPAddr(string(rs.RaftServer.Leader())) - -// if err != nil { -// return err -// } - -// resp, err := http.Post( -// fmt.Sprintf("http://%s/acquire", httpAddr), -// "application-type/json", -// bytes.NewReader(b), -// ) -// if err != nil { -// return err -// } - -// defer resp.Body.Close() - -// return nil - -// // desc := NewSimpleDescriptor(fileID, userID) - -// // err := rs.ls.Acquire(desc) -// // if err != nil { -// // return err -// // } -// // return nil -// } - -// // Release calls the lockservice function Release(). -// // This in turn checks if userID is the owner of fileID -// // and if it is, fileID is no longer locked. -// // However, if userID does not own fileID, then the lock -// // is not released. -// func (rs *RaftStore) Release(fileID, userID string) error { -// b, err := json.Marshal(map[string]string{"fileID": fileID, "userID": userID}) -// if err != nil { -// return err -// } - -// httpAddr, err := getHTTPAddr(string(rs.RaftServer.Leader())) - -// if err != nil { -// return err -// } - -// resp, err := http.Post( -// fmt.Sprintf("http://%s/%s/fileID/%s", httpAddr, rs.raftDir, fileID), -// "application-type/json", -// bytes.NewReader(b), -// ) -// if err != nil { -// return err -// } -// defer resp.Body.Close() - -// return nil - -// // desc := NewSimpleDescriptor(fileID, userID) - -// // err := rs.ls.Release(desc) -// // if err != nil { -// // return err -// // } -// // return nil -// } diff --git a/internal/lockservice/routing.go b/internal/lockservice/routing.go index 500a85e..179bbad 100644 --- a/internal/lockservice/routing.go +++ b/internal/lockservice/routing.go @@ -155,10 +155,12 @@ func (rs *RaftStore) handleJoin(w http.ResponseWriter, r *http.Request) { } remoteAddr := m["addr"] + nodeID := m["id"] - f := rs.RaftServer.AddPeer(raft.ServerAddress(remoteAddr)) - if f.Error() != nil { - fmt.Println(f.Error()) + // f := rs.RaftServer.AddPeer(raft.ServerAddress(remoteAddr)) + err = rs.joinHelper(nodeID, remoteAddr) + if err != nil { + fmt.Println(err.Error()) w.WriteHeader(http.StatusInternalServerError) return } @@ -168,6 +170,43 @@ func (rs *RaftStore) handleJoin(w http.ResponseWriter, r *http.Request) { rs.logger.Printf("node at %s joined successfully", remoteAddr) } +// Join joins a node, identified by nodeID and located at addr, to this store. +// The node must be ready to respond to Raft communications at that address. +func (rs *RaftStore) joinHelper(nodeID, addr string) error { + rs.logger.Printf("received join request for remote node %s at %s", nodeID, addr) + + configFuture := rs.RaftServer.GetConfiguration() + if err := configFuture.Error(); err != nil { + rs.logger.Printf("failed to get raft configuration: %v", err) + return err + } + + for _, srv := range configFuture.Configuration().Servers { + // If a node already exists with either the joining node's ID or address, + // that node may need to be removed from the config first. + if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) { + // However if *both* the ID and the address are the same, then nothing -- not even + // a join operation -- is needed. + if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) { + rs.logger.Printf("node %s at %s already member of cluster, ignoring join request", nodeID, addr) + return nil + } + + future := rs.RaftServer.RemoveServer(srv.ID, 0, 0) + if err := future.Error(); err != nil { + return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err) + } + } + } + + f := rs.RaftServer.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0) + if f.Error() != nil { + return f.Error() + } + rs.logger.Printf("node %s at %s joined successfully", nodeID, addr) + return nil +} + func getRaftAddr(raftAddr string) string { addrParts := strings.Split(raftAddr, ":") httpHost := addrParts[0] From c5215c4ba15ae1064fbdd33ed538ed72bfede009 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 14 Aug 2020 14:07:44 +0530 Subject: [PATCH 07/10] removed redundant prints used for debug --- internal/lockclient/simple_client.go | 3 - internal/lockservice/fsm.go | 2 - internal/lockservice/listener.go | 5 ++ internal/lockservice/raft.go | 73 ++++++++--------------- internal/lockservice/routing.go | 10 +--- internal/lockservice/simpleLockService.go | 9 ++- 6 files changed, 34 insertions(+), 68 deletions(-) diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index b824c9e..432b318 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "errors" - "fmt" "io/ioutil" "net/http" "strings" @@ -43,7 +42,6 @@ func (sc *SimpleClient) Acquire(d lockservice.Descriptors) error { } testData := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - fmt.Println(testData) requestJSON, err := json.Marshal(testData) req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) @@ -61,7 +59,6 @@ func (sc *SimpleClient) Acquire(d lockservice.Descriptors) error { defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) - fmt.Println(body) if resp.StatusCode != 200 { return errors.New(strings.TrimSpace(string(body))) diff --git a/internal/lockservice/fsm.go b/internal/lockservice/fsm.go index 92f7c83..cfd30ae 100644 --- a/internal/lockservice/fsm.go +++ b/internal/lockservice/fsm.go @@ -20,7 +20,6 @@ func (f *fsm) Apply(l *raft.Log) interface{} { panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) } - fmt.Println("reached apply") switch c.Op { case "acquire": return f.applyAcquire(c.Key, c.Value) @@ -35,7 +34,6 @@ func (f *fsm) Apply(l *raft.Log) interface{} { } func (f *fsm) applyAcquire(lock, owner string) interface{} { - fmt.Println("reached applyAcquire") desc := NewSimpleDescriptor(lock, owner) err := f.ls.Acquire(desc) diff --git a/internal/lockservice/listener.go b/internal/lockservice/listener.go index 8743af0..30d393b 100644 --- a/internal/lockservice/listener.go +++ b/internal/lockservice/listener.go @@ -7,10 +7,14 @@ import ( ) // Start starts the http service using the listener within a RaftStore. +// // The HTTP server is used to redirect commands like Set, Delete and Join // to the leader RaftStore in a cluster. The HTTP address is always // one away from the Raft address which the raft node uses for communication // with other raft nodes. +// +// This policy is maybe a bit to trivial and in the future, a more dynamic +// mapping between a Raft node and its listener can be integrated func (rs *RaftStore) Start() error { server := http.Server{ Handler: rs, @@ -32,6 +36,7 @@ func (rs *RaftStore) Start() error { return nil } +// Close stops the listener corresponding to a Raft node. func (rs *RaftStore) Close() { rs.ln.Close() return diff --git a/internal/lockservice/raft.go b/internal/lockservice/raft.go index b8782e2..f8fd9f5 100644 --- a/internal/lockservice/raft.go +++ b/internal/lockservice/raft.go @@ -43,7 +43,7 @@ type RaftStore struct { logger *log.Logger } -// New returns a new Store. +// New returns a new instance of RaftStore. func New(inmem bool) *RaftStore { return &RaftStore{ ls: NewSimpleLockService(zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())), @@ -55,30 +55,29 @@ func New(inmem bool) *RaftStore { // Open opens the store. If enableSingle is set, and there are no existing peers, // then this node becomes the first node, and therefore leader, of the cluster. // localID should be the server identifier for this node. -func (s *RaftStore) Open(enableSingle bool, localID string) error { +func (rs *RaftStore) Open(enableSingle bool, localID string) error { // Setup Raft configuration. config := raft.DefaultConfig() config.LocalID = raft.ServerID(localID) - // config.ProtocolVersion = 2 - httpAddr, err := getHTTPAddr(s.RaftAddr) + httpAddr, err := getHTTPAddr(rs.RaftAddr) if err != nil { return err } - s.httpAddr = httpAddr + rs.httpAddr = httpAddr // Setup Raft communication. - addr, err := net.ResolveTCPAddr("tcp", s.RaftAddr) + addr, err := net.ResolveTCPAddr("tcp", rs.RaftAddr) if err != nil { return err } - transport, err := raft.NewTCPTransport(s.RaftAddr, addr, 3, 10*time.Second, os.Stderr) + transport, err := raft.NewTCPTransport(rs.RaftAddr, addr, 3, 10*time.Second, os.Stderr) if err != nil { return err } // Create the snapshot store. This allows the Raft to truncate the log. - snapshots, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr) + snapshots, err := raft.NewFileSnapshotStore(rs.RaftDir, retainSnapshotCount, os.Stderr) if err != nil { return fmt.Errorf("file snapshot store: %s", err) } @@ -86,11 +85,11 @@ func (s *RaftStore) Open(enableSingle bool, localID string) error { // Create the log store and stable store. var logStore raft.LogStore var stableStore raft.StableStore - if s.inmem { + if rs.inmem { logStore = raft.NewInmemStore() stableStore = raft.NewInmemStore() } else { - boltDB, err := raftboltdb.NewBoltStore(filepath.Join(s.RaftDir, "raft.db")) + boltDB, err := raftboltdb.NewBoltStore(filepath.Join(rs.RaftDir, "raft.db")) if err != nil { return fmt.Errorf("new bolt store: %s", err) } @@ -99,11 +98,11 @@ func (s *RaftStore) Open(enableSingle bool, localID string) error { } // Instantiate the Raft systems. - ra, err := raft.NewRaft(config, (*fsm)(s), logStore, stableStore, snapshots, transport) + ra, err := raft.NewRaft(config, (*fsm)(rs), logStore, stableStore, snapshots, transport) if err != nil { return fmt.Errorf("new raft: %s", err) } - s.RaftServer = ra + rs.RaftServer = ra if enableSingle { configuration := raft.Configuration{ @@ -120,6 +119,16 @@ func (s *RaftStore) Open(enableSingle bool, localID string) error { return nil } +// Join facilitates the addition of a new Raft node to an existing +// cluster. +// +// The addresss and nodeID are those of the node to be added to the +// cluster. +// +// This function will called on an instance of RaftStore of a node +// existing in the cluster. The function internally sends a HTTP +// request to the listener of the leader of the cluster with the +// new node's ID and address. func (rs *RaftStore) Join(addr, ID string) error { b, err := json.Marshal(map[string]string{"addr": addr, "id": ID}) if err != nil { @@ -143,6 +152,9 @@ func (rs *RaftStore) Join(addr, ID string) error { "application-type/json", bytes.NewReader(b), ) + if err != nil { + return err + } defer resp.Body.Close() if err != nil { return err @@ -151,43 +163,6 @@ func (rs *RaftStore) Join(addr, ID string) error { return nil } -// // Join joins a node, identified by nodeID and located at addr, to this store. -// // The node must be ready to respond to Raft communications at that address. -// func (s *RaftStore) Join(nodeID, addr string) error { -// s.logger.Printf("received join request for remote node %s at %s", nodeID, addr) - -// configFuture := s.RaftServer.GetConfiguration() -// if err := configFuture.Error(); err != nil { -// s.logger.Printf("failed to get raft configuration: %v", err) -// return err -// } - -// for _, srv := range configFuture.Configuration().Servers { -// // If a node already exists with either the joining node's ID or address, -// // that node may need to be removed from the config first. -// if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) { -// // However if *both* the ID and the address are the same, then nothing -- not even -// // a join operation -- is needed. -// if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) { -// s.logger.Printf("node %s at %s already member of cluster, ignoring join request", nodeID, addr) -// return nil -// } - -// future := s.RaftServer.RemoveServer(srv.ID, 0, 0) -// if err := future.Error(); err != nil { -// return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err) -// } -// } -// } - -// f := s.RaftServer.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0) -// if f.Error() != nil { -// return f.Error() -// } -// s.logger.Printf("node %s at %s joined successfully", nodeID, addr) -// return nil -// } - func getHTTPAddr(raftAddr string) (string, error) { addrParts := strings.Split(raftAddr, ":") httpHost := addrParts[0] diff --git a/internal/lockservice/routing.go b/internal/lockservice/routing.go index 179bbad..9da1754 100644 --- a/internal/lockservice/routing.go +++ b/internal/lockservice/routing.go @@ -13,15 +13,10 @@ import ( func (rs *RaftStore) ServeHTTP(w http.ResponseWriter, r *http.Request) { if getRaftAddr(rs.httpAddr) != string(rs.RaftServer.Leader()) { - // fmt.Printf("%s %s\n", rs.httpAddr, string(rs.RaftServer.Leader())) - // http.Redirect(w, r, string(rs.RaftServer.Leader()), http.StatusOK) - // return url := r.URL url.Host, _ = getHTTPAddr(string(rs.RaftServer.Leader())) url.Scheme = "http" - fmt.Println(url.String()) - proxyReq, err := http.NewRequest(r.Method, url.String(), r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -72,7 +67,6 @@ func (rs *RaftStore) handleAcquire(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - fmt.Printf("Reahed handleAcquire : %v", req) c := &command{ Op: "acquire", @@ -146,7 +140,6 @@ func (rs *RaftStore) handleRelease(w http.ResponseWriter, r *http.Request) { // handleJoin actually applies the join upon receiving the http request. func (rs *RaftStore) handleJoin(w http.ResponseWriter, r *http.Request) { - fmt.Printf("reached handle join %s\n", rs.httpAddr) m := map[string]string{} err := json.NewDecoder(r.Body).Decode(&m) if err != nil { @@ -164,10 +157,9 @@ func (rs *RaftStore) handleJoin(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - fmt.Printf("no error\n") - fmt.Printf("node at %s joined successfully", remoteAddr) rs.logger.Printf("node at %s joined successfully", remoteAddr) + w.Write([]byte("joined cluster")) } // Join joins a node, identified by nodeID and located at addr, to this store. diff --git a/internal/lockservice/simpleLockService.go b/internal/lockservice/simpleLockService.go index 5db73af..0e3f276 100644 --- a/internal/lockservice/simpleLockService.go +++ b/internal/lockservice/simpleLockService.go @@ -1,7 +1,6 @@ package lockservice import ( - "fmt" "sync" "github.com/rs/zerolog" @@ -20,7 +19,8 @@ type SimpleConfig struct { } // LockRequest is a struct used by the client to -// communicate to the listener HTTP server +// communicate to the HTTP server acting as a listener +// for each Raft node type LockRequest struct { FileID string `json:"FileID"` UserID string `json:"UserID"` @@ -118,7 +118,7 @@ func (ls *SimpleLockService) Acquire(sd Descriptors) error { // TryAcquire checks if it is possible to lock a file. // It is used as a check before a acquire() action is -// actually committedin the log of the distributed +// actually committed in the log of the distributed // consensus algorithm. func (ls *SimpleLockService) TryAcquire(sd Descriptors) error { ls.lockMap.Mutex.Lock() @@ -131,7 +131,6 @@ func (ls *SimpleLockService) TryAcquire(sd Descriptors) error { Msg("can't acquire, already been acquired") return ErrFileAcquired } - fmt.Println("no problem with try acquire\n") ls.lockMap.Mutex.Unlock() return nil } @@ -174,7 +173,7 @@ func (ls *SimpleLockService) Release(sd Descriptors) error { // TryRelease checks if it is possible to release a file. // It is used as a check before a release() action is -// actually committedin the log of the distributed +// actually committed in the log of the distributed // consensus algorithm. func (ls *SimpleLockService) TryRelease(sd Descriptors) error { ls.lockMap.Mutex.Lock() From c4c68dd670166a0b0ed555725e623ec010d2fb5d Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 14 Aug 2020 14:27:58 +0530 Subject: [PATCH 08/10] moved raft related files to consensus dir --- internal/{lockservice => consensus}/fsm.go | 14 +- .../{lockservice => consensus}/listener.go | 2 +- internal/{lockservice => consensus}/raft.go | 7 +- .../{lockservice => consensus}/routing.go | 11 +- internal/lockclient/simple_client_test.go | 170 +++++++----------- internal/lockservice/simpleLockService.go | 21 +++ 6 files changed, 106 insertions(+), 119 deletions(-) rename internal/{lockservice => consensus}/fsm.go (88%) rename internal/{lockservice => consensus}/listener.go (97%) rename internal/{lockservice => consensus}/raft.go (94%) rename internal/{lockservice => consensus}/routing.go (95%) diff --git a/internal/lockservice/fsm.go b/internal/consensus/fsm.go similarity index 88% rename from internal/lockservice/fsm.go rename to internal/consensus/fsm.go index cfd30ae..8b78ac5 100644 --- a/internal/lockservice/fsm.go +++ b/internal/consensus/fsm.go @@ -1,10 +1,11 @@ -package lockservice +package consensus import ( "encoding/json" "fmt" "io" + "github.com/SystemBuilders/LocKey/internal/lockservice" "github.com/hashicorp/raft" ) @@ -34,7 +35,7 @@ func (f *fsm) Apply(l *raft.Log) interface{} { } func (f *fsm) applyAcquire(lock, owner string) interface{} { - desc := NewSimpleDescriptor(lock, owner) + desc := lockservice.NewSimpleDescriptor(lock, owner) err := f.ls.Acquire(desc) if err != nil { @@ -44,7 +45,7 @@ func (f *fsm) applyAcquire(lock, owner string) interface{} { } func (f *fsm) applyRelease(lock, owner string) interface{} { - desc := NewSimpleDescriptor(lock, owner) + desc := lockservice.NewSimpleDescriptor(lock, owner) err := f.ls.Release(desc) if err != nil { @@ -59,10 +60,7 @@ func (f *fsm) applyRelease(lock, owner string) interface{} { // Restore where the necessary data is replicated into the finite state machine. // This allows the consensus algorithm to truncate the replicated log. func (f *fsm) Snapshot() (raft.FSMSnapshot, error) { - f.ls.lockMap.Mutex.Lock() - defer f.ls.lockMap.Mutex.Unlock() - - return &fsmSnapshot{lockMap: f.ls.lockMap.LockMap}, nil + return &fsmSnapshot{lockMap: f.ls.GetLockMap()}, nil } // Restores the lockMap to a previous state @@ -74,7 +72,7 @@ func (f *fsm) Restore(lockMap io.ReadCloser) error { // Set the state from snapshot. No need to use mutex lock according // to Hasicorp doc - f.ls.lockMap.LockMap = lockMapSnapshot + f.ls.SetLockMap(lockMapSnapshot) return nil } diff --git a/internal/lockservice/listener.go b/internal/consensus/listener.go similarity index 97% rename from internal/lockservice/listener.go rename to internal/consensus/listener.go index 30d393b..74e7bd1 100644 --- a/internal/lockservice/listener.go +++ b/internal/consensus/listener.go @@ -1,4 +1,4 @@ -package lockservice +package consensus import ( "log" diff --git a/internal/lockservice/raft.go b/internal/consensus/raft.go similarity index 94% rename from internal/lockservice/raft.go rename to internal/consensus/raft.go index f8fd9f5..a2f3e66 100644 --- a/internal/lockservice/raft.go +++ b/internal/consensus/raft.go @@ -1,4 +1,4 @@ -package lockservice +package consensus import ( "bytes" @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/SystemBuilders/LocKey/internal/lockservice" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/rs/zerolog" @@ -34,7 +35,7 @@ type command struct { // lock service (SimpleLockService) type RaftStore struct { httpAddr string - ls *SimpleLockService + ls *lockservice.SimpleLockService inmem bool RaftDir string RaftAddr string @@ -46,7 +47,7 @@ type RaftStore struct { // New returns a new instance of RaftStore. func New(inmem bool) *RaftStore { return &RaftStore{ - ls: NewSimpleLockService(zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())), + ls: lockservice.NewSimpleLockService(zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())), inmem: inmem, logger: log.New(os.Stderr, "[store] ", log.LstdFlags), } diff --git a/internal/lockservice/routing.go b/internal/consensus/routing.go similarity index 95% rename from internal/lockservice/routing.go rename to internal/consensus/routing.go index 9da1754..57ad3dc 100644 --- a/internal/lockservice/routing.go +++ b/internal/consensus/routing.go @@ -1,4 +1,4 @@ -package lockservice +package consensus import ( "encoding/json" @@ -8,6 +8,7 @@ import ( "strconv" "strings" + "github.com/SystemBuilders/LocKey/internal/lockservice" "github.com/hashicorp/raft" ) @@ -60,7 +61,7 @@ func (rs *RaftStore) handleAcquire(w http.ResponseWriter, r *http.Request) { return } - var req LockRequest + var req lockservice.LockRequest err = json.Unmarshal(body, &req) if err != nil { @@ -80,7 +81,7 @@ func (rs *RaftStore) handleAcquire(w http.ResponseWriter, r *http.Request) { } // Check if acquire is possible - desc := NewSimpleDescriptor(req.FileID, req.UserID) + desc := lockservice.NewSimpleDescriptor(req.FileID, req.UserID) err = rs.ls.TryAcquire(desc) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -103,7 +104,7 @@ func (rs *RaftStore) handleRelease(w http.ResponseWriter, r *http.Request) { return } - var req LockRequest + var req lockservice.LockRequest err = json.Unmarshal(body, &req) if err != nil { @@ -122,7 +123,7 @@ func (rs *RaftStore) handleRelease(w http.ResponseWriter, r *http.Request) { } // Check if release is possible - desc := NewSimpleDescriptor(req.FileID, req.UserID) + desc := lockservice.NewSimpleDescriptor(req.FileID, req.UserID) err = rs.ls.TryRelease(desc) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index d7cd230..de940d3 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -6,58 +6,24 @@ import ( "time" "github.com/SystemBuilders/LocKey/internal/cache" + "github.com/SystemBuilders/LocKey/internal/consensus" "github.com/SystemBuilders/LocKey/internal/lockservice" ) -// func TestRaft(t *testing.T) { -// raftLS := lockservice.New(true) -// // raftLS, err := lockservice.NewRaftServer( -// // "test", -// // "127.0.0.1:5000", -// // ) -// raftLS.RaftAddr = "127.0.0.1:5000" -// raftLS.Open(true, "node0") -// raftLS.Start() - -// scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "5001") - -// size := 5 -// cache := cache.NewLRUCache(size) -// sc := NewSimpleClient(*scfg, *cache) - -// d := lockservice.NewSimpleDescriptor("test", "owner") - -// time.Sleep(3 * time.Second) - -// got := sc.Acquire(d) -// var want error -// if got != want { -// t.Errorf("acquire: got %q want %q", got, want) -// } - -// got = sc.Release(d) - -// } - func TestAcquireandRelease(t *testing.T) { - // zerolog.New(os.Stdout).With() - - // log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) - // scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "1234") - // ls := lockservice.NewSimpleLockService(log) scfg := lockservice.NewSimpleConfig("http://127.0.0.1", "5001") scfg1 := lockservice.NewSimpleConfig("http://127.0.0.1", "7001") quit := make(chan bool, 1) go func() { - raftLS := lockservice.New(true) + raftLS := consensus.New(true) raftLS.RaftAddr = "127.0.0.1:5000" raftLS.Open(true, "node0") raftLS.Start() time.Sleep(3 * time.Second) - raftLS2 := lockservice.New(true) + raftLS2 := consensus.New(true) raftLS2.RaftAddr = "127.0.0.1:6000" raftLS2.Open(true, "node2") @@ -69,7 +35,7 @@ func TestAcquireandRelease(t *testing.T) { time.Sleep(3 * time.Second) - raftLS3 := lockservice.New(true) + raftLS3 := consensus.New(true) raftLS3.RaftAddr = "127.0.0.1:7000" raftLS3.Open(true, "node3") @@ -132,69 +98,69 @@ func TestAcquireandRelease(t *testing.T) { } }) - // t.Run("acquire test, acquire test, release test", func(t *testing.T) { - // size := 5 - // cache := cache.NewLRUCache(size) - // sc := NewSimpleClient(*scfg, *cache) - - // d := lockservice.NewSimpleDescriptor("test", "owner") - - // got := sc.Acquire(d) - // var want error - // if got != want { - // t.Errorf("acquire: got %q want %q", got, want) - // } - - // got = sc.Acquire(d) - // want = lockservice.ErrFileAcquired - // if got.Error() != want.Error() { - // t.Errorf("acquire: got %q want %q", got, want) - // } - - // d = lockservice.NewSimpleDescriptor("test", "owner") - - // got = sc.Release(d) - // want = nil - // if got != want { - // t.Errorf("release: got %q want %q", got, want) - // } - // }) - - // t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { - // size := 1 - // cache := cache.NewLRUCache(size) - // sc := NewSimpleClient(*scfg, *cache) - - // d := lockservice.NewSimpleDescriptor("test", "owner1") - // got := sc.Acquire(d) - // var want error - // if got != want { - // t.Errorf("acquire: got %q want %q", got, want) - // } - - // d = lockservice.NewSimpleDescriptor("test", "owner2") - // got = sc.Release(d) - // want = lockservice.ErrUnauthorizedAccess - // if got != want { - // t.Errorf("release: got %v want %v", got, want) - // } - - // d = lockservice.NewSimpleDescriptor("test2", "owner1") - // got = sc.Acquire(d) - // want = nil - // if got != want { - // t.Errorf("acquire: got %q want %q", got, want) - // } - - // d = lockservice.NewSimpleDescriptor("test", "owner1") - - // got = sc.Release(d) - // want = nil - - // if got != want { - // t.Errorf("release: got %q want %q", got, want) - // } - // }) - // quit <- true + t.Run("acquire test, acquire test, release test", func(t *testing.T) { + size := 5 + cache := cache.NewLRUCache(size) + sc := NewSimpleClient(*scfg, *cache) + + d := lockservice.NewSimpleDescriptor("test", "owner") + + got := sc.Acquire(d) + var want error + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + got = sc.Acquire(d) + want = lockservice.ErrFileAcquired + if got.Error() != want.Error() { + t.Errorf("acquire: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test", "owner") + + got = sc.Release(d) + want = nil + if got != want { + t.Errorf("release: got %q want %q", got, want) + } + }) + + t.Run("acquire test, trying to release test as another entity should fail", func(t *testing.T) { + size := 1 + cache := cache.NewLRUCache(size) + sc := NewSimpleClient(*scfg, *cache) + + d := lockservice.NewSimpleDescriptor("test", "owner1") + got := sc.Acquire(d) + var want error + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test", "owner2") + got = sc.Release(d) + want = lockservice.ErrUnauthorizedAccess + if got != want { + t.Errorf("release: got %v want %v", got, want) + } + + d = lockservice.NewSimpleDescriptor("test2", "owner1") + got = sc.Acquire(d) + want = nil + if got != want { + t.Errorf("acquire: got %q want %q", got, want) + } + + d = lockservice.NewSimpleDescriptor("test", "owner1") + + got = sc.Release(d) + want = nil + + if got != want { + t.Errorf("release: got %q want %q", got, want) + } + }) + quit <- true return } diff --git a/internal/lockservice/simpleLockService.go b/internal/lockservice/simpleLockService.go index 0e3f276..1fedc6a 100644 --- a/internal/lockservice/simpleLockService.go +++ b/internal/lockservice/simpleLockService.go @@ -243,3 +243,24 @@ func (ls *SimpleLockService) CheckReleased(sd Descriptors) bool { ls.lockMap.Mutex.Unlock() return true } + +// GetLockMap returns a copy of the current state of the +// lockservice's lock map. +// This function is used by the Snapshot() function of the +// finite state machine of the distributed consensus algorithm +func (ls *SimpleLockService) GetLockMap() map[string]string { + ls.lockMap.Mutex.Lock() + defer ls.lockMap.Mutex.Unlock() + + return ls.lockMap.LockMap + +} + +// SetLockMap sets the LockMap used by SimpleLockService to +// a snapshot of it that is passed as an argument to the +// function +func (ls *SimpleLockService) SetLockMap(lockMapSnapshot map[string]string) { + // Set the state from snapshot. No need to use mutex lock according + // to Hasicorp doc + ls.lockMap.LockMap = lockMapSnapshot +} From ad3954b49773586cd52fd2a6ceaff5c26550bd5a Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 14 Aug 2020 14:42:18 +0530 Subject: [PATCH 09/10] added documentation --- internal/consensus/raft.go | 24 +++++++++++++++++++++++- internal/consensus/routing.go | 14 ++------------ internal/lockclient/simple_client.go | 6 +++--- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/internal/consensus/raft.go b/internal/consensus/raft.go index a2f3e66..d549b23 100644 --- a/internal/consensus/raft.go +++ b/internal/consensus/raft.go @@ -31,7 +31,7 @@ type command struct { } // A RaftStore encapsulates the http server (listener), -// a raft node (raftDir, raftAddr, RaftServer) and a +// a raft node (RaftDir, RaftAddr, RaftServer) and a // lock service (SimpleLockService) type RaftStore struct { httpAddr string @@ -164,6 +164,17 @@ func (rs *RaftStore) Join(addr, ID string) error { return nil } +// getHTTPAddr and getRaftAddr implement the mapping between +// a Raft node and its HTTP listener. +// The mapping is such that the listener's IP address is 1 +// more than that of the Raft node's. +// +// Given the address of a Raft node, getHTTPAddr() returns +// the IP address of its listener +// +// Given the address of the HTTP listener of a Raft node, +// getRaftAddr() returns the IP address of the corresponding +// Raft node. func getHTTPAddr(raftAddr string) (string, error) { addrParts := strings.Split(raftAddr, ":") httpHost := addrParts[0] @@ -174,3 +185,14 @@ func getHTTPAddr(raftAddr string) (string, error) { return fmt.Sprintf("%s:%d", httpHost, port+1), nil } + +func getRaftAddr(raftAddr string) string { + addrParts := strings.Split(raftAddr, ":") + httpHost := addrParts[0] + port, err := strconv.Atoi(addrParts[1]) + if err != nil { + return "" + } + return fmt.Sprintf("%s:%d", httpHost, port-1) + +} diff --git a/internal/consensus/routing.go b/internal/consensus/routing.go index 57ad3dc..0ab0d84 100644 --- a/internal/consensus/routing.go +++ b/internal/consensus/routing.go @@ -5,13 +5,14 @@ import ( "fmt" "io/ioutil" "net/http" - "strconv" "strings" "github.com/SystemBuilders/LocKey/internal/lockservice" "github.com/hashicorp/raft" ) +// ServeHTTP is a function needed to allow RaftStore to act as a handler +// to a HTTP server. func (rs *RaftStore) ServeHTTP(w http.ResponseWriter, r *http.Request) { if getRaftAddr(rs.httpAddr) != string(rs.RaftServer.Leader()) { url := r.URL @@ -199,14 +200,3 @@ func (rs *RaftStore) joinHelper(nodeID, addr string) error { rs.logger.Printf("node %s at %s joined successfully", nodeID, addr) return nil } - -func getRaftAddr(raftAddr string) string { - addrParts := strings.Split(raftAddr, ":") - httpHost := addrParts[0] - port, err := strconv.Atoi(addrParts[1]) - if err != nil { - return "" - } - return fmt.Sprintf("%s:%d", httpHost, port-1) - -} diff --git a/internal/lockclient/simple_client.go b/internal/lockclient/simple_client.go index 432b318..59995b9 100644 --- a/internal/lockclient/simple_client.go +++ b/internal/lockclient/simple_client.go @@ -20,7 +20,7 @@ type SimpleClient struct { cache cache.LRUCache } -// NewSimpleKey returns a new SimpleKey of the given value. +// NewSimpleClient returns a new instance of SimpleClient func NewSimpleClient(config lockservice.SimpleConfig, cache cache.LRUCache) *SimpleClient { return &SimpleClient{ config: config, @@ -77,8 +77,8 @@ func (sc *SimpleClient) Release(d lockservice.Descriptors) error { endPoint := sc.config.IPAddr + ":" + sc.config.PortAddr + "/release" testData := lockservice.LockRequest{FileID: d.ID(), UserID: d.Owner()} - requestJson, err := json.Marshal(testData) - req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJson)) + requestJSON, err := json.Marshal(testData) + req, err := http.NewRequest("POST", endPoint, bytes.NewBuffer(requestJSON)) req.Header.Set("Content-Type", "application/json") client := &http.Client{} From 233db5b04de87357928c8610ea8e1d9a5bf5f9d7 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 14 Aug 2020 17:04:02 +0530 Subject: [PATCH 10/10] removed sleep from client test --- internal/lockclient/simple_client_test.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/internal/lockclient/simple_client_test.go b/internal/lockclient/simple_client_test.go index de940d3..69e93c5 100644 --- a/internal/lockclient/simple_client_test.go +++ b/internal/lockclient/simple_client_test.go @@ -17,26 +17,27 @@ func TestAcquireandRelease(t *testing.T) { quit := make(chan bool, 1) go func() { raftLS := consensus.New(true) - raftLS.RaftAddr = "127.0.0.1:5000" raftLS.Open(true, "node0") raftLS.Start() + // Give the server time to start up time.Sleep(3 * time.Second) - raftLS2 := consensus.New(true) + raftLS2 := consensus.New(true) raftLS2.RaftAddr = "127.0.0.1:6000" raftLS2.Open(true, "node2") raftLS2.Start() + time.Sleep(3 * time.Second) + fmt.Printf("joining") raftLS.Join("127.0.0.1:6000", "node2") - time.Sleep(5 * time.Second) - time.Sleep(3 * time.Second) + // Extra time is needed for the join operation to complete + time.Sleep(5 * time.Second) raftLS3 := consensus.New(true) - raftLS3.RaftAddr = "127.0.0.1:7000" raftLS3.Open(true, "node3") raftLS3.Start() @@ -45,6 +46,7 @@ func TestAcquireandRelease(t *testing.T) { fmt.Printf("joining") raftLS.Join("127.0.0.1:7000", "node3") + time.Sleep(5 * time.Second) for { @@ -56,8 +58,8 @@ func TestAcquireandRelease(t *testing.T) { } }() - // Server takes some time to start - time.Sleep(30 * time.Second) + // Give time for the cluster to form + time.Sleep(25 * time.Second) t.Run("acquire test release test", func(t *testing.T) { size := 5 cache := cache.NewLRUCache(size) @@ -67,7 +69,6 @@ func TestAcquireandRelease(t *testing.T) { d := lockservice.NewSimpleDescriptor("test", "owner") got := sc.Acquire(d) - time.Sleep(5 * time.Second) var want error if got != want { t.Errorf("acquire: got %q want %q", got, want) @@ -76,7 +77,6 @@ func TestAcquireandRelease(t *testing.T) { d = lockservice.NewSimpleDescriptor("test1", "owner") got = sc.Acquire(d) - time.Sleep(5 * time.Second) if got != want { t.Errorf("acquire: got %q want %q", got, want) } @@ -84,7 +84,6 @@ func TestAcquireandRelease(t *testing.T) { d = lockservice.NewSimpleDescriptor("test", "owner") got = sc1.Release(d) - time.Sleep(5 * time.Second) if got != want { t.Errorf("release: got %q want %q", got, want) } @@ -92,7 +91,6 @@ func TestAcquireandRelease(t *testing.T) { d = lockservice.NewSimpleDescriptor("test1", "owner") got = sc1.Release(d) - time.Sleep(5 * time.Second) if got != want { t.Errorf("release: got %q want %q", got, want) }