Skip to content

Commit

Permalink
refactor: change sayHello to direct
Browse files Browse the repository at this point in the history
  • Loading branch information
ZigBalthazar committed Aug 23, 2023
1 parent 4622c3f commit 76892e0
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 49 deletions.
8 changes: 6 additions & 2 deletions network/notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event, logger *lo
}

func (n *NotifeeService) Connected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
n.eventChannel <- &ConnectEvent{PeerID: conn.RemotePeer()}
peerID := conn.RemotePeer()
n.logger.Info("Connected to peer with peerId:", "PeerID", peerID)
n.eventChannel <- &ConnectEvent{PeerID: peerID}
}

func (n *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
n.eventChannel <- &DisconnectEvent{PeerID: conn.RemotePeer()}
peerID := conn.RemotePeer()
n.logger.Info("Disconnected from peer with peerId:", "PeerID", peerID)
n.eventChannel <- &DisconnectEvent{PeerID: peerID}
}

func (n *NotifeeService) Listen(_ lp2pnetwork.Network, ma ma.Multiaddr) {
Expand Down
22 changes: 15 additions & 7 deletions sync/handler_blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, initiator p
handler.logger.Warn("we are busy", "message", msg, "pid", initiator)
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected, "we are busy",
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator, msg.SessionID)
handler.sendBlocks(response, initiator, msg.SessionID)

return nil
}
Expand All @@ -36,7 +36,7 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, initiator p
err := errors.Errorf(errors.ErrInvalidMessage, "peer status is %v", peer.Status)
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected, err.Error(),
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator, msg.SessionID)
handler.sendBlocks(response, initiator, msg.SessionID)

return err
}
Expand All @@ -47,7 +47,7 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, initiator p
err := errors.Errorf(errors.ErrInvalidMessage, "the request height is not acceptable: %v", msg.From)
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected, err.Error(),
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator, msg.SessionID)
handler.sendBlocks(response, initiator, msg.SessionID)

return err
}
Expand All @@ -59,7 +59,7 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, initiator p
err := errors.Errorf(errors.ErrInvalidMessage, "too many blocks requested: %v-%v", msg.From, msg.Count)
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected, err.Error(),
msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator, msg.SessionID)
handler.sendBlocks(response, initiator, msg.SessionID)

return err
}
Expand All @@ -74,7 +74,7 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, initiator p

response := message.NewBlocksResponseMessage(message.ResponseCodeMoreBlocks, message.ResponseCodeMoreBlocks.String(),
msg.SessionID, height, blocksData, nil)
handler.sendTo(response, initiator, msg.SessionID)
handler.sendBlocks(response, initiator, msg.SessionID)

height += uint32(len(blocksData))
count -= uint32(len(blocksData))
Expand All @@ -90,11 +90,11 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, initiator p
lastCertificate := handler.state.LastCertificate()
response := message.NewBlocksResponseMessage(message.ResponseCodeSynced, message.ResponseCodeSynced.String(),
msg.SessionID, peerHeight, nil, lastCertificate)
handler.sendTo(response, initiator, msg.SessionID)
handler.sendBlocks(response, initiator, msg.SessionID)
} else {
response := message.NewBlocksResponseMessage(message.ResponseCodeNoMoreBlocks,
message.ResponseCodeNoMoreBlocks.String(), msg.SessionID, 0, nil, nil)
handler.sendTo(response, initiator, msg.SessionID)
handler.sendBlocks(response, initiator, msg.SessionID)
}

return nil
Expand All @@ -103,3 +103,11 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, initiator p
func (handler *blocksRequestHandler) PrepareBundle(m message.Message) *bundle.Bundle {
return bundle.NewBundle(handler.SelfID(), m)
}

func (handler *blocksRequestHandler) sendBlocks(msg message.Message, to peer.ID, sessionID int) {
if err := handler.sendTo(msg, to); err != nil {
// Let's close the session with this peer because we couldn't establish a connection.
// This helps to free sessions and ask blocks from other peers.
handler.peerSet.CloseSession(sessionID)
}
}
6 changes: 2 additions & 4 deletions sync/handler_blocks_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,9 @@ func TestSyncing(t *testing.T) {
assert.NoError(t, syncAlice.Start())
assert.NoError(t, syncBob.Start())

// Verify that Hello messages are exchanged between Alice and Bob
shouldPublishMessageWithThisType(t, networkAlice, message.TypeHello)
shouldPublishMessageWithThisType(t, networkBob, message.TypeHello)
syncAlice.sayHello(false, syncBob.SelfID())

// Verify that Hello-ack messages are exchanged between Alice and Bob
// Verify that Hello messages are exchanged between Alice and Bob
shouldPublishMessageWithThisType(t, networkAlice, message.TypeHello)
shouldPublishMessageWithThisType(t, networkBob, message.TypeHello)

Expand Down
2 changes: 1 addition & 1 deletion sync/handler_hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (handler *helloHandler) ParseMessage(m message.Message, initiator peer.ID)
// TODO: Sends response only if there is a direct connection between two peers.
// TODO: check if we have handshaked before. Ignore responding again
// Response to Hello
handler.sayHello(true)
handler.sayHello(true, initiator)
}

handler.updateBlockchain()
Expand Down
5 changes: 3 additions & 2 deletions sync/handler_hello_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func TestParsingHelloMessages(t *testing.T) {
})
}

func TestBroadcastingHelloMessages(t *testing.T) {
func TestSendingHelloMessage(t *testing.T) {
td := setup(t, nil)

td.sync.sayHello(true)
to := td.RandomPeerID()
td.sync.sayHello(true, to)

bdl := td.shouldPublishMessageWithThisType(t, td.network, message.TypeHello)
assert.True(t, util.IsFlagSet(bdl.Flags, bundle.BundleFlagHelloMessage))
Expand Down
2 changes: 1 addition & 1 deletion sync/peerset/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Peer struct {
func NewPeer(peerID peer.ID) *Peer {
return &Peer{
ConsensusKeys: make([]*bls.PublicKey, 0),
Status: StatusCodeUnknown,
Status: StatusCodeConnected,
PeerID: peerID,
ReceivedBytes: make(map[message.Type]int64),
SentBytes: make(map[message.Type]int64),
Expand Down
11 changes: 5 additions & 6 deletions sync/peerset/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ func TestPeerSet(t *testing.T) {
})

t.Run("Testing GetPeer", func(t *testing.T) {
p := peerSet.GetPeer(pid2)

p := peerSet.GetPeer(peer.ID("connected"))
assert.Equal(t, pid2, p.PeerID)
assert.Equal(t, StatusCodeKnown, p.Status)

Expand Down Expand Up @@ -266,22 +265,22 @@ func TestGetRandomWeightedPeer(t *testing.T) {
assert.Greater(t, hits[peer.ID("peer_6")], 0)
}

func TestGetRandomPeerUnknown(t *testing.T) {
func TestGetRandomPeerConnected(t *testing.T) {
ts := testsuite.NewTestSuite(t)

peerSet := NewPeerSet(time.Second)

pk, _ := ts.RandomBLSKeyPair()
pidUnknown := peer.ID("peer_unknown")
peerSet.UpdatePeerInfo(pidUnknown, StatusCodeUnknown, "Moniker_unknown", "Agent1", []*bls.PublicKey{pk}, true)
pidConnected := peer.ID("peer_connected")
peerSet.UpdatePeerInfo(pidConnected, StatusCodeConnected, "Moniker_unknown", "Agent1", []*bls.PublicKey{pk}, true)

pk, _ = ts.RandomBLSKeyPair()
pidBanned := peer.ID("peer_banned")
peerSet.UpdatePeerInfo(pidBanned, StatusCodeBanned, "Moniker_banned", "Agent1", []*bls.PublicKey{pk}, true)

p := peerSet.GetRandomPeer()

assert.NotEqual(t, p.PeerID, pidUnknown)
assert.NotEqual(t, p.PeerID, pidConnected)
assert.NotEqual(t, p.PeerID, pidBanned)
}

Expand Down
2 changes: 1 addition & 1 deletion sync/peerset/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestPeer(t *testing.T) {

t.Run("NewPeer", func(t *testing.T) {
assert.NotNil(t, p1)
assert.Equal(t, p1.Status, StatusCodeUnknown)
assert.Equal(t, p1.Status, StatusCodeConnected)
})

t.Run("status check", func(t *testing.T) {
Expand Down
14 changes: 10 additions & 4 deletions sync/peerset/status_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@ package peerset
type StatusCode int

const (
StatusCodeBanned = StatusCode(-1)
StatusCodeUnknown = StatusCode(0)
StatusCodeKnown = StatusCode(1)
StatusCodeTrusty = StatusCode(2)
StatusCodeBanned = StatusCode(-1)
StatusCodeUnknown = StatusCode(0)
StatusCodeDisconnected = StatusCode(1)
StatusCodeConnected = StatusCode(2)
StatusCodeKnown = StatusCode(3)
StatusCodeTrusty = StatusCode(4)
)

func (code StatusCode) String() string {
switch code {
case StatusCodeBanned:
return "banned"
case StatusCodeDisconnected:
return "disconnected"
case StatusCodeConnected:
return "connected"
case StatusCodeUnknown:
return "unknown"
case StatusCodeKnown:
Expand Down
57 changes: 36 additions & 21 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (sync *synchronizer) Start() error {
go sync.heartBeatTickerLoop()
}

sync.sayHello(false)
sync.moveConsensusToNewHeight()

return nil
Expand Down Expand Up @@ -166,7 +165,7 @@ func (sync *synchronizer) broadcastHeartBeat() {
}
}

func (sync *synchronizer) sayHello(helloAck bool) {
func (sync *synchronizer) sayHello(helloAck bool, to peer.ID) {
flags := 0
if sync.config.NodeNetwork {
flags = util.SetFlag(flags, message.FlagNodeNetwork)
Expand All @@ -186,7 +185,11 @@ func (sync *synchronizer) sayHello(helloAck bool) {

msg.Sign(sync.signers...)

sync.broadcast(msg)
err := sync.sendTo(msg, to)
if err != nil {
sync.logger.Error("Sending Hello message Faild", "to", to, "Ack", helloAck, "error", err)
}
sync.logger.Info("Sending Hello message", "to", to, "Ack", helloAck)
}

func (sync *synchronizer) broadcastLoop() {
Expand All @@ -208,26 +211,37 @@ func (sync *synchronizer) receiveLoop() {
return

case e := <-sync.networkCh:
var bdl *bundle.Bundle

switch e.Type() {
case network.EventTypeGossip:
ge := e.(*network.GossipMessage)
bdl = sync.firewall.OpenGossipBundle(ge.Data, ge.Source, ge.From)
bdl := sync.firewall.OpenGossipBundle(ge.Data, ge.Source, ge.From)
err := sync.processIncomingBundle(bdl)
if err != nil {
sync.logger.Warn("error on parsing a Gossip bundle", "initiator", bdl.Initiator, "bundle", bdl, "err", err)
sync.peerSet.IncreaseInvalidBundlesCounter(bdl.Initiator)
}

case network.EventTypeStream:
se := e.(*network.StreamMessage)
bdl = sync.firewall.OpenStreamBundle(se.Reader, se.Source)
bdl := sync.firewall.OpenStreamBundle(se.Reader, se.Source)
if err := se.Reader.Close(); err != nil {
// TODO: write test for me
sync.peerSet.IncreaseSendFailedCounter(se.Source)
sync.logger.Warn("error on closing stream", "err", err)
}
}

err := sync.processIncomingBundle(bdl)
if err != nil {
sync.logger.Warn("error on parsing a bundle", "initiator", bdl.Initiator, "bundle", bdl, "err", err)
sync.peerSet.IncreaseInvalidBundlesCounter(bdl.Initiator)
err := sync.processIncomingBundle(bdl)
if err != nil {
sync.logger.Warn("error on parsing a Stream bundle", "initiator", bdl.Initiator, "bundle", bdl, "err", err)
sync.peerSet.IncreaseInvalidBundlesCounter(bdl.Initiator)
}
case network.EventTypeConnect:
ce := e.(*network.ConnectEvent)
sync.sayHello(false, ce.PeerID)
sync.peerSet.UpdateStatus(ce.PeerID, peerset.StatusCodeConnected)
case network.EventTypeDisconnect:
ce := e.(*network.DisconnectEvent)
sync.peerSet.UpdateStatus(ce.PeerID, peerset.StatusCodeDisconnected)
}
}
}
Expand Down Expand Up @@ -313,7 +327,7 @@ func (sync *synchronizer) prepareBundle(msg message.Message) *bundle.Bundle {
return nil
}

func (sync *synchronizer) sendTo(msg message.Message, to peer.ID, sessionID int) {
func (sync *synchronizer) sendTo(msg message.Message, to peer.ID) error {
bdl := sync.prepareBundle(msg)
if bdl != nil {
data, _ := bdl.Encode()
Expand All @@ -322,17 +336,15 @@ func (sync *synchronizer) sendTo(msg message.Message, to peer.ID, sessionID int)
if err != nil {
sync.logger.Warn("error on sending bundle", "bundle", bdl, "err", err, "to", to)
sync.peerSet.IncreaseSendFailedCounter(to)

// Let's close the session with this peer because we couldn't establish a connection.
// This helps to free sessions and ask blocks from other peers.
sync.peerSet.CloseSession(sessionID)
} else {
sync.logger.Info("sending bundle to a peer", "bundle", bdl, "to", to)
sync.peerSet.IncreaseSendSuccessCounter(to)
sync.peerSet.IncreaseSentBytesCounter(msg.Type(), int64(len(data)), &to)
return err
}
sync.logger.Info("sending bundle to a peer", "bundle", bdl, "to", to)
sync.peerSet.IncreaseSendSuccessCounter(to)

sync.peerSet.IncreaseSentBytesCounter(msg.Type(), int64(len(data)), &to)
}
return nil
}

func (sync *synchronizer) broadcast(msg message.Message) {
Expand Down Expand Up @@ -389,7 +401,10 @@ func (sync *synchronizer) downloadBlocks(from uint32, onlyNodeNetwork bool) {
sync.logger.Debug("sending download request", "from", from+1, "count", count, "pid", p.PeerID)
session := sync.peerSet.OpenSession(p.PeerID)
msg := message.NewBlocksRequestMessage(session.SessionID(), from+1, count)
sync.sendTo(msg, p.PeerID, session.SessionID())
err := sync.sendTo(msg, p.PeerID)
if err != nil {
sync.peerSet.CloseSession(session.SessionID())
}
from += count
}
}
Expand Down

0 comments on commit 76892e0

Please sign in to comment.