Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"bytes"
"context"
"log"
"sync"

"github.com/decanus/bureka/dht/state"
Expand Down Expand Up @@ -129,15 +130,15 @@ func (d *DHT) AddPeer(id state.Peer) {
}

// RemovePeer removes a peer from the dht.
func (d *DHT) RemovePeer(id state.Peer) {
func (d *DHT) RemovePeer(id state.Peer) bool {
d.Lock()
defer d.Unlock()

ns, _ := d.NeighborhoodSet.Remove(id)
d.NeighborhoodSet = ns

d.RoutingTable = d.RoutingTable.Remove(d.ID, id)
d.LeafSet.Remove(id)
return d.LeafSet.Remove(id)
}

func (d *DHT) Heartbeat(id state.Peer) {
Expand All @@ -149,6 +150,20 @@ func (d *DHT) Heartbeat(id state.Peer) {
}
}

// Close sends a message to all neighbors that a peer is exiting the DHT.
func (d *DHT) Close() {
d.MapNeighbors(func(peer state.Peer) {
err := d.Send(
context.Background(),
&pb.Message{Key: peer, Type: pb.Message_NODE_EXIT, Sender: d.ID},
)

if err != nil {
log.Println(err)
}
})
}

// MapNeighbors iterates over the NeighborhoodSet and calls the process for every peer.
func (d *DHT) MapNeighbors(process MapFunc) {
d.RLock()
Expand Down
9 changes: 8 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"errors"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/event"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (n *Node) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error)
b := []byte(id)
p := n.dht.Find(b)
if p == nil {
return peer.AddrInfo{}, nil // @todo error
return peer.AddrInfo{}, errors.New("failed to find peer")
}

id, err := peer.IDFromBytes(p)
Expand All @@ -90,6 +91,12 @@ func (n *Node) Send(ctx context.Context, msg *pb.Message) error {
return n.dht.Send(ctx, msg)
}

// Close closes the node and stops the DHT.
func (n *Node) Close() {
n.dht.Close()
n.host.Close()
}

func (n *Node) handleOutgoingMessages() {
c := make(chan dht.Packet)
n.dht.Feed().Subscribe(c)
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestFindPeer(t *testing.T) {
dhts := setupNodes(t, ctx, 4)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].host.Close()
dhts[i].Close()
}
}()

Expand Down