Skip to content

Commit

Permalink
chore(share/p2p/peers): Change namings inside discovery (celestiaorg#…
Browse files Browse the repository at this point in the history
…3191)

Discovery can be used for different kinds of nodes (soon `archival`) so
let's rename to avoid confusion in future.
  • Loading branch information
renaynay authored Feb 27, 2024
1 parent 8c946a8 commit fd00ba6
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 39 deletions.
2 changes: 1 addition & 1 deletion nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newDiscovery(cfg *disc.Parameters,
h,
routingdisc.NewRoutingDiscovery(r),
fullNodesTag,
disc.WithOnPeersUpdate(manager.UpdateFullNodePool),
disc.WithOnPeersUpdate(manager.UpdateNodePool),
)
}
}
Expand Down
6 changes: 3 additions & 3 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (d *Discovery) Stop(context.Context) error {
return nil
}

// Peers provides a list of discovered peers in the "full" topic.
// Peers provides a list of discovered peers in the given topic.
// If Discovery hasn't found any peers, it blocks until at least one peer is found.
func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) {
return d.set.Peers(ctx)
Expand Down Expand Up @@ -212,9 +212,9 @@ func (d *Discovery) discoveryLoop(ctx context.Context) {
case <-warnTicker.C:
if d.set.Size() < d.set.Limit() {
log.Warnf(
"Potentially degraded connectivity, unable to discover the desired amount of full node peers in %v. "+
"Potentially degraded connectivity, unable to discover the desired amount of %s peers in %v. "+
"Number of peers discovered: %d. Required: %d.",
logInterval, d.set.Size(), d.set.Limit(),
d.tag, logInterval, d.set.Size(), d.set.Limit(),
)
}
// Do not break the loop; just continue
Expand Down
57 changes: 29 additions & 28 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ type Manager struct {
// track peers for those headers
storeFrom atomic.Uint64

// fullNodes collects full nodes peer.ID found via discovery
fullNodes *pool
// nodes collects nodes' peer.IDs found via discovery
nodes *pool

// hashes that are not in the chain
blacklistedHashes map[string]bool
Expand Down Expand Up @@ -123,7 +123,7 @@ func NewManager(
}
}

s.fullNodes = newPool(s.params.PeerCooldown)
s.nodes = newPool(s.params.PeerCooldown)
return s, nil
}

Expand Down Expand Up @@ -188,8 +188,8 @@ func (m *Manager) Stop(ctx context.Context) error {
}

// Peer returns peer collected from shrex.Sub for given datahash if any available.
// If there is none, it will look for full nodes collected from discovery. If there is no discovered
// full nodes, it will wait until any peer appear in either source or timeout happen.
// If there is none, it will look for nodes collected from discovery. If there is no discovered
// nodes, it will wait until any peer appear in either source or timeout happen.
// After fetching data using given peer, caller is required to call returned DoneFunc using
// appropriate result value
func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint64,
Expand All @@ -205,11 +205,11 @@ func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0)
}

// if no peer for datahash is currently available, try to use full node
// if no peer for datahash is currently available, try to use node
// obtained from discovery
peerID, ok = m.fullNodes.tryGet()
peerID, ok = m.nodes.tryGet()
if ok {
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), 0)
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.nodes.len(), 0)
}

// no peers are available right now, wait for the first one
Expand All @@ -220,27 +220,27 @@ func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint
return m.Peer(ctx, datahash, height)
}
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
case peerID = <-m.fullNodes.next(ctx):
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start))
case peerID = <-m.nodes.next(ctx):
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.nodes.len(), time.Since(start))
case <-ctx.Done():
return "", nil, ctx.Err()
}
}

// UpdateFullNodePool is called by discovery when new full node is discovered or removed
func (m *Manager) UpdateFullNodePool(peerID peer.ID, isAdded bool) {
// UpdateNodePool is called by discovery when new node is discovered or removed.
func (m *Manager) UpdateNodePool(peerID peer.ID, isAdded bool) {
if isAdded {
if m.isBlacklistedPeer(peerID) {
log.Debugw("got blacklisted peer from discovery", "peer", peerID.String())
return
}
m.fullNodes.add(peerID)
log.Debugw("added to full nodes", "peer", peerID)
m.nodes.add(peerID)
log.Debugw("added to discovered nodes pool", "peer", peerID)
return
}

log.Debugw("removing peer from discovered full nodes", "peer", peerID.String())
m.fullNodes.remove(peerID)
log.Debugw("removing peer from discovered nodes pool", "peer", peerID.String())
m.nodes.remove(peerID)
}

func (m *Manager) newPeer(
Expand Down Expand Up @@ -273,7 +273,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS
case ResultNoop:
case ResultCooldownPeer:
if source == sourceFullNodes {
m.fullNodes.putOnCooldown(peerID)
m.nodes.putOnCooldown(peerID)
return
}
m.getPool(datahash.String()).putOnCooldown(peerID)
Expand Down Expand Up @@ -311,7 +311,7 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
}

// subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected
// peers from full nodes pool
// peers from nodes pool
func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) {
defer close(m.disconnectedPeersDone)
defer sub.Close()
Expand All @@ -324,13 +324,14 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs
log.Fatal("Subscription for connectedness events is closed.") //nolint:gocritic
return
}
// listen to disconnect event to remove peer from full nodes pool
// listen to disconnect event to remove peer from nodes pool
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
peer := connStatus.Peer
if m.fullNodes.has(peer) {
log.Debugw("peer disconnected, removing from full nodes", "peer", peer.String())
m.fullNodes.remove(peer)
if m.nodes.has(peer) {
log.Debugw("peer disconnected, removing from discovered nodes pool",
"peer", peer.String())
m.nodes.remove(peer)
}
}
}
Expand Down Expand Up @@ -368,8 +369,8 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif

p.add(peerID)
if p.isValidatedDataHash.Load() {
// add peer to full nodes pool only if datahash has been already validated
m.fullNodes.add(peerID)
// add peer to discovered nodes pool only if datahash has been already validated
m.nodes.add(peerID)
}
return pubsub.ValidationIgnore
}
Expand Down Expand Up @@ -408,7 +409,7 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID)
continue
}

m.fullNodes.remove(peerID)
m.nodes.remove(peerID)
// add peer to the blacklist, so we can't connect to it in the future.
err := m.connGater.BlockPeer(peerID)
if err != nil {
Expand Down Expand Up @@ -436,15 +437,15 @@ func (m *Manager) validatedPool(hashStr string, height uint64) *syncPool {
p := m.getOrCreatePool(hashStr, height)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "datahash", hashStr)
// if pool is proven to be valid, add all collected peers to full nodes
m.fullNodes.add(p.peers()...)
// if pool is proven to be valid, add all collected peers to discovered nodes
m.nodes.add(p.peers()...)
}
return p
}

// removeIfUnreachable removes peer from some pool if it is blacklisted or disconnected
func (m *Manager) removeIfUnreachable(pool *syncPool, peerID peer.ID) bool {
if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) {
if m.isBlacklistedPeer(peerID) || !m.nodes.has(peerID) {
log.Debugw("removing outdated peer from pool", "peer", peerID.String())
pool.remove(peerID)
return true
Expand Down
10 changes: 5 additions & 5 deletions share/p2p/peers/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestManager(t *testing.T) {

// create validated pool
validDataHash := share.DataHash("datahash2")
manager.fullNodes.add("full") // add FN to unblock Peer call
manager.nodes.add("full") // add FN to unblock Peer call
manager.Peer(ctx, validDataHash, h.Height()) //nolint:errcheck
require.Len(t, manager.pools, 3)

Expand Down Expand Up @@ -166,7 +166,7 @@ func TestManager(t *testing.T) {

// add peers to fullnodes, imitating discovery add
peers := []peer.ID{"peer1", "peer2", "peer3"}
manager.fullNodes.add(peers...)
manager.nodes.add(peers...)

peerID, _, err := manager.Peer(ctx, h.DataHash.Bytes(), h.Height())
require.NoError(t, err)
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestManager(t *testing.T) {
}()

// send peers
manager.fullNodes.add(peers...)
manager.nodes.add(peers...)

// wait for peer to be received
select {
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestIntegration(t *testing.T) {
fnHost,
routingdisc.NewRoutingDiscovery(fnRouter),
fullNodesTag,
discovery.WithOnPeersUpdate(fnPeerManager.UpdateFullNodePool),
discovery.WithOnPeersUpdate(fnPeerManager.UpdateNodePool),
discovery.WithOnPeersUpdate(checkDiscoveredPeer),
)
require.NoError(t, fnDisc.Start(ctx))
Expand All @@ -450,7 +450,7 @@ func TestIntegration(t *testing.T) {

select {
case <-waitCh:
require.Contains(t, fnPeerManager.fullNodes.peersList, bnHost.ID())
require.Contains(t, fnPeerManager.nodes.peersList, bnHost.ID())
case <-ctx.Done():
require.NoError(t, ctx.Err())
}
Expand Down
4 changes: 2 additions & 2 deletions share/p2p/peers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func initMetrics(manager *Manager) (*metrics, error) {
attribute.String(poolStatusKey, string(poolStatus))))
}

observer.ObserveInt64(fullNodesPool, int64(manager.fullNodes.len()),
observer.ObserveInt64(fullNodesPool, int64(manager.nodes.len()),
metric.WithAttributes(
attribute.String(peerStatusKey, string(peerStatusActive))))
observer.ObserveInt64(fullNodesPool, int64(manager.fullNodes.cooldown.len()),
observer.ObserveInt64(fullNodesPool, int64(manager.nodes.cooldown.len()),
metric.WithAttributes(
attribute.String(peerStatusKey, string(peerStatusCooldown))))

Expand Down

0 comments on commit fd00ba6

Please sign in to comment.