Skip to content

Commit

Permalink
Feat/peer discovery (#62)
Browse files Browse the repository at this point in the history
* Peer Discovery using a heartbeat methodology
* Tests for peer discovery
* IP address addition to hash table in order to reply back to ping messages
* Table tests for IP address addition/deletion/query
* Additional tests for sync module and transport layer

Co-authored-by: Loong <bzlw@pm.me>
Co-authored-by: Loong <loongy@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 8, 2021
1 parent 2c5d279 commit eff29a3
Show file tree
Hide file tree
Showing 16 changed files with 745 additions and 38 deletions.
1 change: 1 addition & 0 deletions channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/renproject/aw/codec"
"github.com/renproject/aw/wire"
"github.com/renproject/id"

"go.uber.org/zap"
"golang.org/x/time/rate"
)
Expand Down
1 change: 1 addition & 0 deletions channel/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/renproject/aw/codec"
"github.com/renproject/aw/wire"
"github.com/renproject/id"

"go.uber.org/zap"
)

Expand Down
54 changes: 46 additions & 8 deletions dht/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ type Table interface {
// PeerAddress returns the network address associated with the given peer.
PeerAddress(id.Signatory) (wire.Address, bool)

// AddPeer to the table with an associate network address.
AddIP(id.Signatory, string)
// DeleteIP from the table.
DeleteIP(id.Signatory)
// IP returns the network ip address associated with the given peer.
IP(id.Signatory) (string, bool)

// Peers returns the n closest peers to the local peer, using XORing as the
// measure of distance between two peers.
Peers(int) []id.Signatory
Expand Down Expand Up @@ -59,6 +66,9 @@ type InMemTable struct {
addrsBySignatoryMu *sync.Mutex
addrsBySignatory map[id.Signatory]wire.Address

ipBySignatoryMu *sync.Mutex
ipBySignatory map[id.Signatory]string

subnetsByHashMu *sync.Mutex
subnetsByHash map[id.Hash][]id.Signatory

Expand All @@ -75,6 +85,9 @@ func NewInMemTable(self id.Signatory) *InMemTable {
addrsBySignatoryMu: new(sync.Mutex),
addrsBySignatory: map[id.Signatory]wire.Address{},

ipBySignatoryMu: new(sync.Mutex),
ipBySignatory: map[id.Signatory]string{},

subnetsByHashMu: new(sync.Mutex),
subnetsByHash: map[id.Hash][]id.Signatory{},

Expand All @@ -93,21 +106,24 @@ func (table *InMemTable) AddPeer(peerID id.Signatory, peerAddr wire.Address) {
defer table.sortedMu.Unlock()
defer table.addrsBySignatoryMu.Unlock()

if peerID.Equal(&table.self) {
_, ok := table.addrsBySignatory[peerID]
if ok && table.self.Equal(&peerID) {
return
}

// Insert into the map to allow for address lookup using the signatory.
table.addrsBySignatory[peerID] = peerAddr

// Insert into the sorted address list based on its XOR distance from our
// Insert into the sorted signatories list based on its XOR distance from our
// own address.
i := sort.Search(len(table.sorted), func(i int) bool {
return table.isCloser(peerID, table.sorted[i])
})
table.sorted = append(table.sorted, id.Signatory{})
copy(table.sorted[i+1:], table.sorted[i:])
table.sorted[i] = peerID
if !ok {
i := sort.Search(len(table.sorted), func(i int) bool {
return table.isCloser(peerID, table.sorted[i])
})
table.sorted = append(table.sorted, id.Signatory{})
copy(table.sorted[i+1:], table.sorted[i:])
table.sorted[i] = peerID
}
}

func (table *InMemTable) DeletePeer(peerID id.Signatory) {
Expand Down Expand Up @@ -140,6 +156,28 @@ func (table *InMemTable) PeerAddress(peerID id.Signatory) (wire.Address, bool) {
return addr, ok
}

func (table *InMemTable) AddIP(peerID id.Signatory, ipAddress string) {
table.ipBySignatoryMu.Lock()
defer table.ipBySignatoryMu.Unlock()

table.ipBySignatory[peerID] = ipAddress
}

func (table *InMemTable) DeleteIP(peerID id.Signatory) {
table.ipBySignatoryMu.Lock()
defer table.ipBySignatoryMu.Unlock()

delete(table.ipBySignatory, peerID)
}

func (table *InMemTable) IP(peerID id.Signatory) (string, bool) {
table.ipBySignatoryMu.Lock()
defer table.ipBySignatoryMu.Unlock()

ip, ok := table.ipBySignatory[peerID]
return ip, ok
}

// Peers returns the n closest peer IDs.
func (table *InMemTable) Peers(n int) []id.Signatory {
table.sortedMu.Lock()
Expand Down
80 changes: 80 additions & 0 deletions dht/table_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dht_test

import (
"fmt"
"github.com/renproject/aw/wire"
"math/rand"
"strconv"
Expand Down Expand Up @@ -224,6 +225,39 @@ var _ = Describe("DHT", func() {
})
})

Context("when re-inserting an address", func() {
It("the sorted list of signatories should remain unchanged", func() {

r := rand.New(rand.NewSource(time.Now().UnixNano()))
f := func(seed int64) bool {
table, _ := initDHT()
numPeers := r.Intn(100) + 1
for i := 0; i < numPeers; i++ {
privKey := id.NewPrivKey()
sig := privKey.Signatory()
ipAddr := fmt.Sprintf("%d.%d.%d.%d:%d",
r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(65536))
addr := wire.NewUnsignedAddress(wire.TCP, ipAddr, uint64(time.Now().UnixNano()))
table.AddPeer(sig, addr)
}

peers := table.Peers(numPeers + 10)
Expect(len(peers)).To(Equal(numPeers))
randomSig := peers[r.Intn(numPeers)]
newIPAddr := fmt.Sprintf("%d.%d.%d.%d:%d",
r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(65536))
newAddr := wire.NewUnsignedAddress(wire.TCP, newIPAddr, uint64(time.Now().UnixNano()))
table.AddPeer(randomSig, newAddr)

newPeers := table.Peers(numPeers + 1)
Expect(len(newPeers)).To(Equal(numPeers))
Expect(peers).To(Equal(newPeers))
return true
}
Expect(quick.Check(f, nil)).To(Succeed())
})
})

Measure("Adding 10000 addresses to distributed hash table", func(b Benchmarker) {
table, _ := initDHT()
signatories := make([]id.Signatory, 0)
Expand Down Expand Up @@ -260,6 +294,52 @@ var _ = Describe("DHT", func() {
}, 10)
})

Describe("IP Addresses", func() {
Context("when adding an ip address", func() {
It("should be able to query it", func() {
table, _ := initDHT()

r := rand.New(rand.NewSource(time.Now().UnixNano()))
f := func(seed int64) bool {
privKey := id.NewPrivKey()
sig := privKey.Signatory()
ipAddr := fmt.Sprintf("%d.%d.%d.%d:%d",
r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(65536))
table.AddIP(sig, ipAddr)

signatory := id.NewSignatory((*id.PubKey)(&privKey.PublicKey))
newIPAddr, ok := table.IP(signatory)
Expect(ok).To(BeTrue())
Expect(newIPAddr).To(Equal(ipAddr))
return true
}
Expect(quick.Check(f, nil)).To(Succeed())
})
})

Context("when deleting an ip address", func() {
It("should not be able to query it", func() {
table, _ := initDHT()

r := rand.New(rand.NewSource(time.Now().UnixNano()))
f := func(seed int64) bool {
privKey := id.NewPrivKey()
ipAddr := fmt.Sprintf("%d.%d.%d.%d:%d",
r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(65536))

signatory := id.NewSignatory((*id.PubKey)(&privKey.PublicKey))
table.DeleteIP(signatory)
table.AddIP(signatory, ipAddr)
table.DeleteIP(signatory)

_, ok := table.IP(signatory)
return !ok
}
Expect(quick.Check(f, nil)).To(Succeed())
})
})
})

Describe("Subnets", func() {
Context("when adding a subnet", func() {
It("should be able to query it", func() {
Expand Down
4 changes: 3 additions & 1 deletion peer/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ func (g *Gossiper) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash

msg := wire.Msg{Version: wire.MsgVersion1, To: *subnet, Type: wire.MsgTypePush, Data: contentID}
for _, recipient := range recipients {
if err := g.transport.Send(ctx, recipient, msg); err != nil {
innerContext, cancel := context.WithTimeout(ctx, g.opts.Timeout)
if err := g.transport.Send(innerContext, recipient, msg); err != nil {
g.opts.Logger.Error("pushing gossip", zap.String("peer", recipient.String()), zap.Error(err))
}
cancel()
}
}

Expand Down
14 changes: 0 additions & 14 deletions peer/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,6 @@ var _ = Describe("Gossip", func() {
n := 4
opts, peers, tables, contentResolvers, _, _ := setup(n)

for i := range peers {
self := peers[i].ID()
peers[i].Receive(context.Background(), func(from id.Signatory, msg wire.Msg) error {
switch msg.Type {
case wire.MsgTypePush:
fmt.Printf("%v received Push from %v\n", self.String(), from.String())
case wire.MsgTypePull:
fmt.Printf("%v received Pull from %v\n", self.String(), from.String())
case wire.MsgTypeSync:
fmt.Printf("%v received Sync from %v saying : %v\n", self.String(), from.String(), string(msg.SyncData))
}
return nil
})
}
for i := range peers {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down
26 changes: 24 additions & 2 deletions peer/opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,30 @@ func (opts GossiperOptions) WithTimeout(timeout time.Duration) GossiperOptions {
return opts
}

type DiscoveryOptions struct {
Logger *zap.Logger
Alpha int
MaxExpectedPeers int
PingTimePeriod time.Duration
}

func DefaultDiscoveryOptions() DiscoveryOptions {
logger, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
return DiscoveryOptions{
Logger: logger,
Alpha: DefaultAlpha,
MaxExpectedPeers: DefaultAlpha,
PingTimePeriod: DefaultTimeout,
}
}

type Options struct {
SyncerOptions
GossiperOptions
DiscoveryOptions

Logger *zap.Logger
PrivKey *id.PrivKey
Expand All @@ -88,8 +109,9 @@ func DefaultOptions() Options {
}
privKey := id.NewPrivKey()
return Options{
SyncerOptions: DefaultSyncerOptions(),
GossiperOptions: DefaultGossiperOptions(),
SyncerOptions: DefaultSyncerOptions(),
GossiperOptions: DefaultGossiperOptions(),
DiscoveryOptions: DefaultDiscoveryOptions(),

Logger: logger,
PrivKey: privKey,
Expand Down
25 changes: 17 additions & 8 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@ var (
)

type Peer struct {
opts Options
transport *transport.Transport
syncer *Syncer
gossiper *Gossiper
opts Options
transport *transport.Transport
syncer *Syncer
gossiper *Gossiper
discoveryClient *DiscoveryClient
}

func New(opts Options, transport *transport.Transport) *Peer {
filter := channel.NewSyncFilter()
return &Peer{
opts: opts,
transport: transport,
syncer: NewSyncer(opts.SyncerOptions, filter, transport),
gossiper: NewGossiper(opts.GossiperOptions, filter, transport),
opts: opts,
transport: transport,
syncer: NewSyncer(opts.SyncerOptions, filter, transport),
gossiper: NewGossiper(opts.GossiperOptions, filter, transport),
discoveryClient: NewDiscoveryClient(opts.DiscoveryOptions, transport),
}
}

Expand Down Expand Up @@ -80,6 +82,10 @@ func (p *Peer) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash) {
p.gossiper.Gossip(ctx, contentID, subnet)
}

func (p *Peer) DiscoverPeers(ctx context.Context) {
p.discoveryClient.DiscoverPeers(ctx)
}

func (p *Peer) Run(ctx context.Context) {
p.transport.Receive(ctx, func(from id.Signatory, msg wire.Msg) error {
if err := p.syncer.DidReceiveMessage(from, msg); err != nil {
Expand All @@ -88,6 +94,9 @@ func (p *Peer) Run(ctx context.Context) {
if err := p.gossiper.DidReceiveMessage(from, msg); err != nil {
return err
}
if err := p.discoveryClient.DidReceiveMessage(from, msg); err != nil {
return err
}
return nil
})
p.transport.Run(ctx)
Expand Down
Loading

0 comments on commit eff29a3

Please sign in to comment.