Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ Usage with libp2p:

```go
func main() {
writer := node.NewWriter()
d := dht.New(id, writer)
d := dht.New(id)

n := node.New(ctx.Background(), d, host.Host, writer)
n := node.New(ctx.Background(), d, host.Host)
}
```
68 changes: 1 addition & 67 deletions dht/dht.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
package dht

import (
"bytes"
"context"
"sync"

"github.com/decanus/bureka/dht/state"
"github.com/decanus/bureka/pb"
)

// Transport is responsible for sending messages.
// This represents a call back function that can be implemented on network IO.
type Transport interface {
Send(ctx context.Context, target state.Peer, msg *pb.Message) error
}

// ApplicationID represents a unique identifier for the application.
type ApplicationID string

Expand All @@ -39,19 +31,16 @@ type DHT struct {
RoutingTable state.RoutingTable

applications map[ApplicationID]Application

transport Transport
}

// New returns a new DHT.
func New(id state.Peer, transport Transport) *DHT {
func New(id state.Peer) *DHT {
return &DHT{
ID: id,
LeafSet: state.NewLeafSet(id),
NeighborhoodSet: make(state.Set, 0),
RoutingTable: make(state.RoutingTable, 0),
applications: make(map[ApplicationID]Application),
transport: transport,
}
}

Expand All @@ -71,34 +60,6 @@ func (d *DHT) RemoveApplication(aid ApplicationID) {
delete(d.applications, aid)
}

// Send a message to the target peer or closest available peer.
func (d *DHT) Send(ctx context.Context, msg *pb.Message) error {
key := msg.Key

if bytes.Equal(key, d.ID) {
d.deliver(msg) // @todo we may need to do this for more than just message types, like when the routing table is updated.
return nil
}

target := d.Find(key)
if target == nil {
d.deliver(msg)
return nil
}

forward := d.forward(msg, target)
if !forward {
return nil
}

err := d.transport.Send(ctx, target, msg)
if err != nil {
return err
}

return nil
}

// Find returns the closest known peer to a given target or the target itself.
func (d *DHT) Find(target state.Peer) state.Peer {
d.RLock()
Expand Down Expand Up @@ -198,30 +159,3 @@ func (d *DHT) ImportPeers(routingTable [][]byte, neighborhood [][]byte, leafset
d.LeafSet.Insert(peer)
}
}

// deliver sends the message to all connected applications.
func (d *DHT) deliver(msg *pb.Message) {
d.RLock()
defer d.RUnlock()

for _, app := range d.applications {
app.Deliver(msg)
}
}

// forward asks all applications whether a message should be forwarded to a peer or not.
func (d *DHT) forward(msg *pb.Message, target state.Peer) bool {
d.RLock()
defer d.RUnlock()

// @todo need to run over this logic
forward := true
for _, app := range d.applications {
f := app.Forward(msg, target)
if forward {
forward = f
}
}

return forward
}
81 changes: 1 addition & 80 deletions dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,15 @@ package dht_test

import (
"bytes"
"context"
"testing"

"github.com/golang/mock/gomock"

"github.com/decanus/bureka/dht"
internal "github.com/decanus/bureka/dht/internal/mocks"
"github.com/decanus/bureka/dht/state"
"github.com/decanus/bureka/pb"
)

func TestNode_AddPeer_And_RemovePeer(t *testing.T) {
id := []byte{5, 5, 5, 5}
insert := []byte{0, 1, 3, 3}
n := dht.New(id, nil)
n := dht.New(id)

n.AddPeer(insert)

Expand Down Expand Up @@ -46,76 +40,3 @@ func TestNode_AddPeer_And_RemovePeer(t *testing.T) {
t.Error("failed to remove peer from LeafSet")
}
}

func TestNode_Send_ToSelf(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

transport := internal.NewMockTransport(ctrl)
n := dht.New([]byte("bob"), transport)

application := internal.NewMockApplication(ctrl)
n.AddApplication("app", application)

msg := &pb.Message{Type: pb.Message_MESSAGE, Key: n.ID}

application.EXPECT().Deliver(gomock.Eq(msg)).Times(1)

err := n.Send(context.Background(), msg)
if err != nil {
t.Fatal(err)
}
}

func TestNode_Send_WhenPeerInLeafSet(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

transport := internal.NewMockTransport(ctrl)
n := dht.New([]byte("bob"), transport)

application := internal.NewMockApplication(ctrl)
n.AddApplication("app", application)

target := make(state.Peer, 3)
target[0] = 3
n.AddPeer(target)

msg := &pb.Message{Type: pb.Message_MESSAGE, Key: target}

application.EXPECT().Forward(gomock.Eq(msg), gomock.Eq(target)).Times(1).Return(true)

ctx := context.Background()
transport.EXPECT().Send(gomock.Eq(ctx), gomock.Eq(target), gomock.Eq(msg)).Times(1)

err := n.Send(ctx, msg)
if err != nil {
t.Fatal(err)
}
}

func TestNode_Send_DoesNothingOnFalseForward(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

transport := internal.NewMockTransport(ctrl)
n := dht.New([]byte("bob"), transport)

application := internal.NewMockApplication(ctrl)
n.AddApplication("app", application)

target := make(state.Peer, 3)
target[0] = 3
n.AddPeer(target)

msg := &pb.Message{Type: pb.Message_MESSAGE, Key: target}

application.EXPECT().Forward(gomock.Eq(msg), gomock.Eq(target)).Times(1).Return(false)

ctx := context.Background()

err := n.Send(ctx, msg)
if err != nil {
t.Fatal(err)
}
}
73 changes: 70 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package node

import (
"bytes"
"context"
"sync"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/event"
Expand All @@ -10,33 +12,48 @@ import (
"github.com/libp2p/go-libp2p-core/routing"

"github.com/decanus/bureka/dht"
"github.com/decanus/bureka/dht/state"
"github.com/decanus/bureka/node/internal"
"github.com/decanus/bureka/pb"
)

var logger = logging.Logger("dht")

// ApplicationID represents a unique identifier for the application.
type ApplicationID string

// Application represents a pastry application
type Application interface {
Deliver(msg *pb.Message)
Forward(msg *pb.Message, target state.Peer) bool
Heartbeat(id state.Peer)
}

// Node is a simple implementation that bridges libp2p IO to the pastry DHT state machine.
type Node struct {
sync.RWMutex

ctx context.Context

dht *dht.DHT
host host.Host
writer *internal.Writer

sub event.Subscription

applications map[ApplicationID]Application
}

// Guarantee that we implement interfaces.
var _ routing.PeerRouting = (*Node)(nil)

// New returns a new Node.
func New(ctx context.Context, d *dht.DHT, h host.Host, w *internal.Writer) (*Node, error) {
func New(ctx context.Context, d *dht.DHT, h host.Host) (*Node, error) {
n := &Node{
ctx: ctx,
dht: d,
host: h,
writer: w,
writer: internal.NewWriter(),
}

s, err := n.subscribe()
Expand Down Expand Up @@ -79,5 +96,55 @@ func (n *Node) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error)
}

func (n *Node) Send(ctx context.Context, msg *pb.Message) error {
return n.dht.Send(ctx, msg)
key := msg.Key

if bytes.Equal(key, []byte(n.host.ID())) {
n.deliver(msg) // @todo we may need to do this for more than just message types, like when the routing table is updated.
return nil
}

target := n.dht.Find(key)
if target == nil {
n.deliver(msg)
return nil
}

forward := n.forward(msg, target)
if !forward {
return nil
}

err := n.writer.Send(ctx, target, msg)
if err != nil {
return err
}

return nil
}

// deliver sends the message to all connected applications.
func (n *Node) deliver(msg *pb.Message) {
n.RLock()
defer n.RUnlock()

for _, app := range n.applications {
app.Deliver(msg)
}
}

// forward asks all applications whether a message should be forwarded to a peer or not.
func (n *Node) forward(msg *pb.Message, target state.Peer) bool {
n.RLock()
defer n.RUnlock()

// @todo need to run over this logic
forward := true
for _, app := range n.applications {
f := app.Forward(msg, target)
if forward {
forward = f
}
}

return forward
}