Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): add metric to track the network activity #1552

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions sync/firewall/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (f *Firewall) OpenStreamBundle(r io.Reader, from peer.ID) (*bundle.Bundle,

func (f *Firewall) openBundle(r io.Reader, from peer.ID) (*bundle.Bundle, error) {
f.peerSet.UpdateLastReceived(from)
f.peerSet.IncreaseReceivedBundlesCounter(from)

p := f.peerSet.GetPeer(from)
if p.Status.IsBanned() {
Expand All @@ -131,31 +130,32 @@ func (f *Firewall) openBundle(r io.Reader, from peer.ID) (*bundle.Bundle, error)
}
}

bdl, err := f.decodeBundle(r, from)
bdl, bytesRead, err := f.decodeBundle(r)
if err != nil {
f.peerSet.IncreaseInvalidBundlesCounter(from)
f.peerSet.UpdateInvalidMetric(from, int64(bytesRead))

return nil, err
}

if err := f.checkBundle(bdl); err != nil {
f.peerSet.IncreaseInvalidBundlesCounter(from)
f.peerSet.UpdateInvalidMetric(from, int64(bytesRead))

return bdl, err
}

f.peerSet.UpdateReceivedMetric(from, bdl.Message.Type(), int64(bytesRead))

return bdl, nil
}

func (f *Firewall) decodeBundle(r io.Reader, pid peer.ID) (*bundle.Bundle, error) {
func (*Firewall) decodeBundle(r io.Reader) (*bundle.Bundle, int, error) {
bdl := new(bundle.Bundle)
bytesRead, err := bdl.Decode(r)
if err != nil {
return nil, err
return nil, bytesRead, err
}
f.peerSet.IncreaseReceivedBytesCounter(pid, bdl.Message.Type(), int64(bytesRead))

return bdl, nil
return bdl, bytesRead, nil
}

func (f *Firewall) checkBundle(bdl *bundle.Bundle) error {
Expand Down
4 changes: 2 additions & 2 deletions sync/firewall/firewall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func TestDecodeBundles(t *testing.T) {
}

p := td.firewall.peerSet.GetPeer(td.unknownPeerID)
assert.Equal(t, 5, p.ReceivedBundles)
assert.Equal(t, 4, p.InvalidBundles)
assert.Equal(t, int64(1), p.Metric.TotalReceived.Bundles)
assert.Equal(t, int64(4), p.Metric.TotalInvalid.Bundles)
}

func TestGossipMessage(t *testing.T) {
Expand Down
54 changes: 54 additions & 0 deletions sync/peerset/peer/metric/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package metric

import "github.com/pactus-project/pactus/sync/bundle/message"

type Counter struct {
Bytes int64
Bundles int64
}

type Metric struct {
TotalInvalid Counter
TotalSent Counter
TotalReceived Counter
MessageSent map[message.Type]*Counter
MessageReceived map[message.Type]*Counter
}

func NewMetric() Metric {
return Metric{
Ja7ad marked this conversation as resolved.
Show resolved Hide resolved
MessageSent: make(map[message.Type]*Counter),
MessageReceived: make(map[message.Type]*Counter),
}
}

func (m *Metric) UpdateSentMetric(msgType message.Type, bytes int64) {
m.TotalSent.Bundles++
m.TotalSent.Bytes += bytes

_, ok := m.MessageSent[msgType]
if !ok {
m.MessageSent[msgType] = &Counter{}
}

m.MessageSent[msgType].Bundles++
m.MessageSent[msgType].Bytes += bytes
}

func (m *Metric) UpdateReceivedMetric(msgType message.Type, bytes int64) {
m.TotalReceived.Bundles++
m.TotalReceived.Bytes += bytes

_, ok := m.MessageReceived[msgType]
if !ok {
m.MessageReceived[msgType] = &Counter{}
}

m.MessageReceived[msgType].Bundles++
m.MessageReceived[msgType].Bytes += bytes
}

func (m *Metric) UpdateInvalidMetric(bytes int64) {
m.TotalInvalid.Bundles++
m.TotalInvalid.Bytes += bytes
}
46 changes: 46 additions & 0 deletions sync/peerset/peer/metric/metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package metric

import (
"testing"

"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/stretchr/testify/assert"
)

func TestUpdateSentMetric(t *testing.T) {
metric := NewMetric()

testMsgType := message.Type(1)

metric.UpdateSentMetric(testMsgType, 100)

assert.Equal(t, int64(1), metric.TotalSent.Bundles)
assert.Equal(t, int64(100), metric.TotalSent.Bytes)

assert.NotNil(t, metric.MessageSent[testMsgType])
assert.Equal(t, int64(1), metric.MessageSent[testMsgType].Bundles)
assert.Equal(t, int64(100), metric.MessageSent[testMsgType].Bytes)
}

func TestUpdateReceivedMetric(t *testing.T) {
metric := NewMetric()

testMsgType := message.Type(2)

metric.UpdateReceivedMetric(testMsgType, 200)

assert.Equal(t, int64(1), metric.TotalReceived.Bundles)
assert.Equal(t, int64(200), metric.TotalReceived.Bytes)

assert.NotNil(t, metric.MessageReceived[testMsgType])
assert.Equal(t, int64(1), metric.MessageReceived[testMsgType].Bundles)
assert.Equal(t, int64(200), metric.MessageReceived[testMsgType].Bytes)
}

func TestUpdateInvalidMetric(t *testing.T) {
metric := NewMetric()

metric.UpdateInvalidMetric(123)
assert.Equal(t, int64(1), metric.TotalInvalid.Bundles)
assert.Equal(t, int64(123), metric.TotalInvalid.Bytes)
}
10 changes: 3 additions & 7 deletions sync/peerset/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pactus-project/pactus/crypto/bls"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/sync/peerset/peer/metric"
"github.com/pactus-project/pactus/sync/peerset/peer/service"
"github.com/pactus-project/pactus/sync/peerset/peer/status"
)
Expand All @@ -27,21 +27,17 @@ type Peer struct {
LastReceived time.Time
LastBlockHash hash.Hash
Height uint32
ReceivedBundles int
InvalidBundles int
TotalSessions int
CompletedSessions int
ReceivedBytes map[message.Type]int64
SentBytes map[message.Type]int64
Metric metric.Metric
}

func NewPeer(peerID ID) *Peer {
return &Peer{
ConsensusKeys: make([]*bls.PublicKey, 0),
Status: status.StatusUnknown,
PeerID: peerID,
ReceivedBytes: make(map[message.Type]int64),
SentBytes: make(map[message.Type]int64),
Metric: metric.NewMetric(),
Protocols: make([]string, 0),
}
}
Expand Down
99 changes: 24 additions & 75 deletions sync/peerset/peer_set.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package peerset

import (
"maps"
"sync"
"time"

"github.com/pactus-project/pactus/crypto/bls"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/sync/peerset/peer"
"github.com/pactus-project/pactus/sync/peerset/peer/metric"
"github.com/pactus-project/pactus/sync/peerset/peer/service"
"github.com/pactus-project/pactus/sync/peerset/peer/status"
"github.com/pactus-project/pactus/sync/peerset/session"
Expand All @@ -18,23 +18,18 @@ import (
type PeerSet struct {
lk sync.RWMutex

peers map[peer.ID]*peer.Peer
sessionManager *session.Manager
totalSentBundles int
totalSentBytes int64
totalReceivedBytes int64
sentBytes map[message.Type]int64
receivedBytes map[message.Type]int64
startedAt time.Time
peers map[peer.ID]*peer.Peer
sessionManager *session.Manager
startedAt time.Time
metric metric.Metric
}

// NewPeerSet constructs a new PeerSet for managing peer information.
func NewPeerSet(sessionTimeout time.Duration) *PeerSet {
return &PeerSet{
peers: make(map[peer.ID]*peer.Peer),
sessionManager: session.NewManager(sessionTimeout),
sentBytes: make(map[message.Type]int64),
receivedBytes: make(map[message.Type]int64),
metric: metric.NewMetric(),
startedAt: time.Now(),
}
}
Expand Down Expand Up @@ -257,96 +252,43 @@ func (ps *PeerSet) UpdateLastReceived(pid peer.ID) {
p.LastReceived = time.Now()
}

func (ps *PeerSet) IncreaseReceivedBundlesCounter(pid peer.ID) {
func (ps *PeerSet) UpdateInvalidMetric(pid peer.ID, bytes int64) {
ps.lk.Lock()
defer ps.lk.Unlock()

p := ps.findOrCreatePeer(pid)
p.ReceivedBundles++
}

func (ps *PeerSet) IncreaseInvalidBundlesCounter(pid peer.ID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.metric.UpdateInvalidMetric(bytes)

p := ps.findOrCreatePeer(pid)
p.InvalidBundles++
p.Metric.UpdateInvalidMetric(bytes)
}

func (ps *PeerSet) IncreaseReceivedBytesCounter(pid peer.ID, msgType message.Type, c int64) {
func (ps *PeerSet) UpdateReceivedMetric(pid peer.ID, msgType message.Type, bytes int64) {
ps.lk.Lock()
defer ps.lk.Unlock()

p := ps.findOrCreatePeer(pid)
p.ReceivedBytes[msgType] += c
ps.metric.UpdateReceivedMetric(msgType, bytes)

ps.totalReceivedBytes += c
ps.receivedBytes[msgType] += c
p := ps.findOrCreatePeer(pid)
p.Metric.UpdateReceivedMetric(msgType, bytes)
}

func (ps *PeerSet) IncreaseSentCounters(msgType message.Type, c int64, pid *peer.ID) {
func (ps *PeerSet) UpdateSentMetric(pid *peer.ID, msgType message.Type, bytes int64) {
ps.lk.Lock()
defer ps.lk.Unlock()

ps.totalSentBundles++
ps.totalSentBytes += c
ps.sentBytes[msgType] += c
ps.metric.UpdateSentMetric(msgType, bytes)

if pid != nil {
p := ps.findOrCreatePeer(*pid)
p.SentBytes[msgType] += c
p.Metric.UpdateSentMetric(msgType, bytes)
}
}

func (ps *PeerSet) TotalSentBundles() int {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.totalSentBundles
}

func (ps *PeerSet) TotalSentBytes() int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.totalSentBytes
}

func (ps *PeerSet) TotalReceivedBytes() int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.totalReceivedBytes
}

func (ps *PeerSet) SentBytesMessageType(msgType message.Type) int64 {
if sentBytes, ok := ps.sentBytes[msgType]; ok {
return sentBytes
}

return 0
}

func (ps *PeerSet) ReceivedBytesMessageType(msgType message.Type) int64 {
if receivedBytes, ok := ps.receivedBytes[msgType]; ok {
return receivedBytes
}

return 0
}

func (ps *PeerSet) SentBytes() map[message.Type]int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()

return maps.Clone(ps.sentBytes)
}

func (ps *PeerSet) ReceivedBytes() map[message.Type]int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()

return maps.Clone(ps.receivedBytes)
return int(ps.metric.TotalSent.Bundles)
}

func (ps *PeerSet) StartedAt() time.Time {
Expand Down Expand Up @@ -375,6 +317,13 @@ func (ps *PeerSet) Sessions() []*session.Session {
return ps.sessionManager.Sessions()
}

func (ps *PeerSet) Metric() metric.Metric {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.metric
}

// GetRandomPeer selects a random peer from the peer set based on their download score.
// Peers with higher score are more likely to be selected.
func (ps *PeerSet) GetRandomPeer() *peer.Peer {
Expand Down
Loading
Loading