From 1cc82c11baa7e04a5698fb470725e248bc721beb Mon Sep 17 00:00:00 2001 From: decanus <7621705+decanus@users.noreply.github.com> Date: Tue, 9 Jun 2020 23:59:39 +0200 Subject: [PATCH 1/3] moves send to node --- README.md | 5 ++-- dht/dht.go | 61 +------------------------------------------ node/node.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 73 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 15990fd..4bbeabf 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,8 @@ Usage with libp2p: ```go func main() { - writer := node.NewWriter() - d := dht.New(id, writer) + d := dht.New(id) - n := node.New(ctx.Background(), d, host.Host, writer) + n := node.New(ctx.Background(), d, host.Host) } ``` diff --git a/dht/dht.go b/dht/dht.go index 69b6be0..7b88ae9 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -1,7 +1,6 @@ package dht import ( - "bytes" "context" "sync" @@ -39,19 +38,16 @@ type DHT struct { RoutingTable state.RoutingTable applications map[ApplicationID]Application - - transport Transport } // New returns a new DHT. -func New(id state.Peer, transport Transport) *DHT { +func New(id state.Peer) *DHT { return &DHT{ ID: id, LeafSet: state.NewLeafSet(id), NeighborhoodSet: make(state.Set, 0), RoutingTable: make(state.RoutingTable, 0), applications: make(map[ApplicationID]Application), - transport: transport, } } @@ -71,34 +67,6 @@ func (d *DHT) RemoveApplication(aid ApplicationID) { delete(d.applications, aid) } -// Send a message to the target peer or closest available peer. -func (d *DHT) Send(ctx context.Context, msg *pb.Message) error { - key := msg.Key - - if bytes.Equal(key, d.ID) { - d.deliver(msg) // @todo we may need to do this for more than just message types, like when the routing table is updated. - return nil - } - - target := d.Find(key) - if target == nil { - d.deliver(msg) - return nil - } - - forward := d.forward(msg, target) - if !forward { - return nil - } - - err := d.transport.Send(ctx, target, msg) - if err != nil { - return err - } - - return nil -} - // Find returns the closest known peer to a given target or the target itself. func (d *DHT) Find(target state.Peer) state.Peer { d.RLock() @@ -198,30 +166,3 @@ func (d *DHT) ImportPeers(routingTable [][]byte, neighborhood [][]byte, leafset d.LeafSet.Insert(peer) } } - -// deliver sends the message to all connected applications. -func (d *DHT) deliver(msg *pb.Message) { - d.RLock() - defer d.RUnlock() - - for _, app := range d.applications { - app.Deliver(msg) - } -} - -// forward asks all applications whether a message should be forwarded to a peer or not. -func (d *DHT) forward(msg *pb.Message, target state.Peer) bool { - d.RLock() - defer d.RUnlock() - - // @todo need to run over this logic - forward := true - for _, app := range d.applications { - f := app.Forward(msg, target) - if forward { - forward = f - } - } - - return forward -} diff --git a/node/node.go b/node/node.go index 4929cc8..6802e34 100644 --- a/node/node.go +++ b/node/node.go @@ -1,7 +1,9 @@ package node import ( + "bytes" "context" + "sync" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/event" @@ -10,14 +12,27 @@ import ( "github.com/libp2p/go-libp2p-core/routing" "github.com/decanus/bureka/dht" + "github.com/decanus/bureka/dht/state" "github.com/decanus/bureka/node/internal" "github.com/decanus/bureka/pb" ) var logger = logging.Logger("dht") +// ApplicationID represents a unique identifier for the application. +type ApplicationID string + +// Application represents a pastry application +type Application interface { + Deliver(msg *pb.Message) + Forward(msg *pb.Message, target state.Peer) bool + Heartbeat(id state.Peer) +} + // Node is a simple implementation that bridges libp2p IO to the pastry DHT state machine. type Node struct { + sync.RWMutex + ctx context.Context dht *dht.DHT @@ -25,18 +40,20 @@ type Node struct { writer *internal.Writer sub event.Subscription + + applications map[ApplicationID]Application } // Guarantee that we implement interfaces. var _ routing.PeerRouting = (*Node)(nil) // New returns a new Node. -func New(ctx context.Context, d *dht.DHT, h host.Host, w *internal.Writer) (*Node, error) { +func New(ctx context.Context, d *dht.DHT, h host.Host) (*Node, error) { n := &Node{ ctx: ctx, dht: d, host: h, - writer: w, + writer: internal.NewWriter(), } s, err := n.subscribe() @@ -79,5 +96,55 @@ func (n *Node) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) } func (n *Node) Send(ctx context.Context, msg *pb.Message) error { - return n.dht.Send(ctx, msg) + key := msg.Key + + if bytes.Equal(key, []byte(n.host.ID())) { + n.deliver(msg) // @todo we may need to do this for more than just message types, like when the routing table is updated. + return nil + } + + target := n.dht.Find(key) + if target == nil { + n.deliver(msg) + return nil + } + + forward := n.forward(msg, target) + if !forward { + return nil + } + + err := n.writer.Send(ctx, target, msg) + if err != nil { + return err + } + + return nil +} + +// deliver sends the message to all connected applications. +func (n *Node) deliver(msg *pb.Message) { + n.RLock() + defer n.RUnlock() + + for _, app := range n.applications { + app.Deliver(msg) + } +} + +// forward asks all applications whether a message should be forwarded to a peer or not. +func (n *Node) forward(msg *pb.Message, target state.Peer) bool { + n.RLock() + defer n.RUnlock() + + // @todo need to run over this logic + forward := true + for _, app := range n.applications { + f := app.Forward(msg, target) + if forward { + forward = f + } + } + + return forward } From c314555b8e7dbf8d589046560728d8ce5719f809 Mon Sep 17 00:00:00 2001 From: decanus <7621705+decanus@users.noreply.github.com> Date: Wed, 10 Jun 2020 00:01:56 +0200 Subject: [PATCH 2/3] remove old tests --- dht/dht.go | 7 ----- dht/dht_test.go | 80 +------------------------------------------------ 2 files changed, 1 insertion(+), 86 deletions(-) diff --git a/dht/dht.go b/dht/dht.go index 7b88ae9..7150a52 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -1,19 +1,12 @@ package dht import ( - "context" "sync" "github.com/decanus/bureka/dht/state" "github.com/decanus/bureka/pb" ) -// Transport is responsible for sending messages. -// This represents a call back function that can be implemented on network IO. -type Transport interface { - Send(ctx context.Context, target state.Peer, msg *pb.Message) error -} - // ApplicationID represents a unique identifier for the application. type ApplicationID string diff --git a/dht/dht_test.go b/dht/dht_test.go index 7ec23a2..ddbbd69 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -2,21 +2,15 @@ package dht_test import ( "bytes" - "context" "testing" - "github.com/golang/mock/gomock" - "github.com/decanus/bureka/dht" - internal "github.com/decanus/bureka/dht/internal/mocks" - "github.com/decanus/bureka/dht/state" - "github.com/decanus/bureka/pb" ) func TestNode_AddPeer_And_RemovePeer(t *testing.T) { id := []byte{5, 5, 5, 5} insert := []byte{0, 1, 3, 3} - n := dht.New(id, nil) + n := dht.New(id) n.AddPeer(insert) @@ -47,75 +41,3 @@ func TestNode_AddPeer_And_RemovePeer(t *testing.T) { } } -func TestNode_Send_ToSelf(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - transport := internal.NewMockTransport(ctrl) - n := dht.New([]byte("bob"), transport) - - application := internal.NewMockApplication(ctrl) - n.AddApplication("app", application) - - msg := &pb.Message{Type: pb.Message_MESSAGE, Key: n.ID} - - application.EXPECT().Deliver(gomock.Eq(msg)).Times(1) - - err := n.Send(context.Background(), msg) - if err != nil { - t.Fatal(err) - } -} - -func TestNode_Send_WhenPeerInLeafSet(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - transport := internal.NewMockTransport(ctrl) - n := dht.New([]byte("bob"), transport) - - application := internal.NewMockApplication(ctrl) - n.AddApplication("app", application) - - target := make(state.Peer, 3) - target[0] = 3 - n.AddPeer(target) - - msg := &pb.Message{Type: pb.Message_MESSAGE, Key: target} - - application.EXPECT().Forward(gomock.Eq(msg), gomock.Eq(target)).Times(1).Return(true) - - ctx := context.Background() - transport.EXPECT().Send(gomock.Eq(ctx), gomock.Eq(target), gomock.Eq(msg)).Times(1) - - err := n.Send(ctx, msg) - if err != nil { - t.Fatal(err) - } -} - -func TestNode_Send_DoesNothingOnFalseForward(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - transport := internal.NewMockTransport(ctrl) - n := dht.New([]byte("bob"), transport) - - application := internal.NewMockApplication(ctrl) - n.AddApplication("app", application) - - target := make(state.Peer, 3) - target[0] = 3 - n.AddPeer(target) - - msg := &pb.Message{Type: pb.Message_MESSAGE, Key: target} - - application.EXPECT().Forward(gomock.Eq(msg), gomock.Eq(target)).Times(1).Return(false) - - ctx := context.Background() - - err := n.Send(ctx, msg) - if err != nil { - t.Fatal(err) - } -} From a300113a306990f2cff2ee14593dccc93818e51a Mon Sep 17 00:00:00 2001 From: decanus <7621705+decanus@users.noreply.github.com> Date: Wed, 10 Jun 2020 00:02:18 +0200 Subject: [PATCH 3/3] fmt --- dht/dht_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dht/dht_test.go b/dht/dht_test.go index ddbbd69..0fb644a 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -40,4 +40,3 @@ func TestNode_AddPeer_And_RemovePeer(t *testing.T) { t.Error("failed to remove peer from LeafSet") } } -