Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation #74

Open
wants to merge 3 commits into
base: release/0.5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions codec/gcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,29 @@ import (
"github.com/renproject/id"
)

// gcmNone is the nonce used by the gcm encoder to encrypt/decrypt data
// It is represented as a struct with a 32 bit uint and a 64 bit uint,
// together representing the top 32 bits and bottom 64 bits of a 96 bit
// unsigned integer.
// The struct also contains a boolean countDown that represents whether
// the nonce counts up from 0 or counts down from 7.9228163e+28
type gcmNonce struct {
// top and bottom together represent the top 32 bits and bottom 64 bits of a 96 bit unsigned integer
//
top uint32
bottom uint64
countDown bool
}

// next is a method on gcmNonce that counts up or down depending on the countDown flag
func (nonce gcmNonce) next() {
if nonce.countDown {
nonce.pred()
} else {
nonce.succ()
}

}

// succ is a method in gcmNonce that represents a successor function
func (nonce gcmNonce) succ() {
nonce.bottom++
// If bottom overflows, increment top by 1
Expand All @@ -36,6 +43,7 @@ func (nonce gcmNonce) succ() {
}
}

// pred is a method on gcmNonce that represents a predecessor function
func (nonce gcmNonce) pred() {
nonce.bottom--
// If bottom underflows, decrement top by 1
Expand Down
5 changes: 4 additions & 1 deletion dht/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (table *InMemTable) AddPeer(peerID id.Signatory, peerAddr wire.Address) {
table.addrsBySignatory[peerID] = peerAddr

// Insert into the sorted signatories list based on its XOR distance from our
// own address.
// own address. Signatory is added only if it's not present in the sorted list
if !ok {
i := sort.Search(len(table.sorted), func(i int) bool {
return table.isCloser(peerID, table.sorted[i])
Expand Down Expand Up @@ -179,6 +179,9 @@ func (table *InMemTable) RandomPeers(n int) []id.Signatory {
// unsurprising alternative.
return []id.Signatory{}
}

// If sorted list of signatories is smaller than required number `n`,
// return a copy of the list as is
if n >= m {
sigs := make([]id.Signatory, m)
table.sortedMu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions handshake/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func NewOncePool(opts OncePoolOptions) OncePool {
}
}

// Once returns a handshake that only maintains one connection of a particular signatory
// at any given time. If a second connection is attempted, the peer with the larger signatory
// value decides to persist/kill the new connection based on whether the previous connection
// has expired its minimum expiry age
func Once(self id.Signatory, pool *OncePool, h Handshake) Handshake {
return func(conn net.Conn, enc codec.Encoder, dec codec.Decoder) (codec.Encoder, codec.Decoder, id.Signatory, error) {
enc, dec, remote, err := h(conn, enc, dec)
Expand Down
16 changes: 16 additions & 0 deletions peer/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ func NewGossiper(opts GossiperOptions, filter *channel.SyncFilter, transport *tr
}
}

// Resolve accepts a resolver and associates it with the gossiper
func (g *Gossiper) Resolve(resolver dht.ContentResolver) {
g.resolverMu.Lock()
defer g.resolverMu.Unlock()

g.resolver = resolver
}


func (g *Gossiper) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash) {
if subnet == nil {
subnet = &DefaultSubnet
Expand All @@ -72,6 +74,8 @@ func (g *Gossiper) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash
}
}

// DidReceiveMessage accepts a signatory (sender's ID) and a message. It is a
// callback method that is used to check the type of message and invoke relevant methods.
func (g *Gossiper) DidReceiveMessage(from id.Signatory, msg wire.Msg) error {
switch msg.Type {
case wire.MsgTypePush:
Expand All @@ -88,6 +92,10 @@ func (g *Gossiper) DidReceiveMessage(from id.Signatory, msg wire.Msg) error {
return nil
}

// didReceivePush accepts a signatory (sender's ID) and a message. It is a
// callback method that is used to a an initial gossiped message. If the content is
// is relevant, it sends a pull request back to the sender and waits for a sync message
// within a certain time period.
func (g *Gossiper) didReceivePush(from id.Signatory, msg wire.Msg) {
if len(msg.Data) == 0 {
return
Expand Down Expand Up @@ -145,6 +153,10 @@ func (g *Gossiper) didReceivePush(from id.Signatory, msg wire.Msg) {
}
}

// didReceivePull accepts a signatory (sender's ID) and a message. It is a
// callback method that is used to respond to a pull request. If the content is
// is present in associated ContentResolver, it sends a sync request peer that sent the pull request
// with the relevant information (content).
func (g *Gossiper) didReceivePull(from id.Signatory, msg wire.Msg) {
if len(msg.Data) == 0 {
return
Expand Down Expand Up @@ -181,6 +193,10 @@ func (g *Gossiper) didReceivePull(from id.Signatory, msg wire.Msg) {
return
}

// didReceiveSync accepts a signatory (sender's ID) and a message. It is a
// callback method that is used to accept a sync message. A sync message is only accepted
// if the filter is open for the particular contentID. If accepted, the content is inserted
// into the associated ContentResolver
func (g *Gossiper) didReceiveSync(from id.Signatory, msg wire.Msg) {
g.resolverMu.RLock()
if g.resolver == nil {
Expand Down
39 changes: 38 additions & 1 deletion peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var (
DefaultTimeout = time.Second
)

// ErrPeerNotFound represents the error raised when a designated recipient of a message
// is not present in the peer table
var (
ErrPeerNotFound = errors.New("peer not found")
)
Expand All @@ -42,50 +44,79 @@ func New(opts Options, transport *transport.Transport) *Peer {
}
}

// ID return the signatory value associated with the peer
func (p *Peer) ID() id.Signatory {
return p.opts.PrivKey.Signatory()
}

// Syncer method return a pointer to the syncer struct associated with the peer
func (p *Peer) Syncer() *Syncer {
return p.syncer
}

// Gossiper method return a pointer to the gossiper struct associated with the peer
func (p *Peer) Gossiper() *Gossiper {
return p.gossiper
}

// Transport returns a pointer to the transport layer associated with the peer
func (p *Peer) Transport() *transport.Transport {
return p.transport
}

// Link accepts a signatory registers a peer as a trusted individual for
// persistent connections
func (p *Peer) Link(remote id.Signatory) {
p.transport.Link(remote)
}

// Link accepts a signatory and de-registers a peer as a trusted individual
// for persistent connections
func (p *Peer) Unlink(remote id.Signatory) {
p.transport.Unlink(remote)
}

func (p *Peer) Ping(ctx context.Context) error {
// Ping accepts a context and a signatory and is used to send a ping message to a peer.
// It can be used to check liveness of a peer
func (p *Peer) Ping(ctx context.Context, to id.Signatory) error {
return fmt.Errorf("unimplemented")
}

// Send accept a context, the signatory ID of a recipient peer, and a message. It tries to send
// the message to the recipient peer using the transport layer until the context expires
func (p *Peer) Send(ctx context.Context, to id.Signatory, msg wire.Msg) error {
return p.transport.Send(ctx, to, msg)
}

// Sync accepts a context, an array of bytes (contentID) and a pointer to a signatory (hint).
// A call to Sync is used to request content with a particular contentID from peers.
// A hint can be supplied to try syncing from a particular peer as well.
// A nil hint will indicate that the content can be attempted to be synced any alpha random peers,
// where alpha is defined in SyncerOptions struct
func (p *Peer) Sync(ctx context.Context, contentID []byte, hint *id.Signatory) ([]byte, error) {
return p.syncer.Sync(ctx, contentID, hint)
}

// Gossip accepts a context, an array of bytes (contentID) and a pointer to a id.Hash (subnet)
// A call to Gossip is used to gossip information (content) with a particular contentID with peers.
// A subnet can be supplied to try gossip with a particular subnet of peers.
// If the supplied subnet id DefaultSubnet or nil, content will be gossiped with alpha random peers,
// where alpha is defined in GossiperOptions struct.
func (p *Peer) Gossip(ctx context.Context, contentID []byte, subnet *id.Hash) {
p.gossiper.Gossip(ctx, contentID, subnet)
}

// DiscoverPeers accepts a context and is used to start discovering new peers using the
// peer discovery client. DiscoverPeers is a blocking method, ideally it would be called
// as `go p.DiscoverPeers(...)`
func (p *Peer) DiscoverPeers(ctx context.Context) {
p.discoveryClient.DiscoverPeers(ctx)
}

// Run accepts a context and sets up the default receive function the features callbacks
// for syncer, gossiper and peer discovery. Finally, it calls the Run method on the transport
// layer, which then starts to listen for and accept connections.
// A call to Run is a blocking method, ideally it would be called as `go p.Run(...)`
func (p *Peer) Run(ctx context.Context) {
p.transport.Receive(ctx, func(from id.Signatory, packet wire.Packet) error {
// TODO(ross): Think about merging the syncer and the gossiper.
Expand All @@ -103,10 +134,16 @@ func (p *Peer) Run(ctx context.Context) {
p.transport.Run(ctx)
}

// Receive takes a context and a custom receive handler. It is used to execute custom
// logic on messages. Every message, along with the ID (signatory) of the sender is
// passed to this function in a sequential manner.
func (p *Peer) Receive(ctx context.Context, f func(id.Signatory,wire.Packet) error) {
p.transport.Receive(ctx, f)
}

// Resolve takes in a context and a ContentResolver and associates it with
// the gossiper. Gossiper / Sync logic will not function until a valid
// ContentResolver is provided to a call to Resolve
func (p *Peer) Resolve(ctx context.Context, contentResolver dht.ContentResolver) {
p.gossiper.Resolve(contentResolver)
}