From fd00ba69a0bf5c2895d0545f21f35915b8e55059 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 27 Feb 2024 19:28:43 +0100 Subject: [PATCH] chore(share/p2p/peers): Change namings inside discovery (#3191) Discovery can be used for different kinds of nodes (soon `archival`) so let's rename to avoid confusion in future. --- nodebuilder/share/constructors.go | 2 +- share/p2p/discovery/discovery.go | 6 ++-- share/p2p/peers/manager.go | 57 ++++++++++++++++--------------- share/p2p/peers/manager_test.go | 10 +++--- share/p2p/peers/metrics.go | 4 +-- 5 files changed, 40 insertions(+), 39 deletions(-) diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index e13786a4d9..96be2b5d20 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -37,7 +37,7 @@ func newDiscovery(cfg *disc.Parameters, h, routingdisc.NewRoutingDiscovery(r), fullNodesTag, - disc.WithOnPeersUpdate(manager.UpdateFullNodePool), + disc.WithOnPeersUpdate(manager.UpdateNodePool), ) } } diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 0f44d42dbe..c40c979d16 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -117,7 +117,7 @@ func (d *Discovery) Stop(context.Context) error { return nil } -// Peers provides a list of discovered peers in the "full" topic. +// Peers provides a list of discovered peers in the given topic. // If Discovery hasn't found any peers, it blocks until at least one peer is found. func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) { return d.set.Peers(ctx) @@ -212,9 +212,9 @@ func (d *Discovery) discoveryLoop(ctx context.Context) { case <-warnTicker.C: if d.set.Size() < d.set.Limit() { log.Warnf( - "Potentially degraded connectivity, unable to discover the desired amount of full node peers in %v. "+ + "Potentially degraded connectivity, unable to discover the desired amount of %s peers in %v. "+ "Number of peers discovered: %d. Required: %d.", - logInterval, d.set.Size(), d.set.Limit(), + d.tag, logInterval, d.set.Size(), d.set.Limit(), ) } // Do not break the loop; just continue diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 1a00059628..0ae21ff015 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -67,8 +67,8 @@ type Manager struct { // track peers for those headers storeFrom atomic.Uint64 - // fullNodes collects full nodes peer.ID found via discovery - fullNodes *pool + // nodes collects nodes' peer.IDs found via discovery + nodes *pool // hashes that are not in the chain blacklistedHashes map[string]bool @@ -123,7 +123,7 @@ func NewManager( } } - s.fullNodes = newPool(s.params.PeerCooldown) + s.nodes = newPool(s.params.PeerCooldown) return s, nil } @@ -188,8 +188,8 @@ func (m *Manager) Stop(ctx context.Context) error { } // Peer returns peer collected from shrex.Sub for given datahash if any available. -// If there is none, it will look for full nodes collected from discovery. If there is no discovered -// full nodes, it will wait until any peer appear in either source or timeout happen. +// If there is none, it will look for nodes collected from discovery. If there is no discovered +// nodes, it will wait until any peer appear in either source or timeout happen. // After fetching data using given peer, caller is required to call returned DoneFunc using // appropriate result value func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint64, @@ -205,11 +205,11 @@ func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) } - // if no peer for datahash is currently available, try to use full node + // if no peer for datahash is currently available, try to use node // obtained from discovery - peerID, ok = m.fullNodes.tryGet() + peerID, ok = m.nodes.tryGet() if ok { - return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), 0) + return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.nodes.len(), 0) } // no peers are available right now, wait for the first one @@ -220,27 +220,27 @@ func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint return m.Peer(ctx, datahash, height) } return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) - case peerID = <-m.fullNodes.next(ctx): - return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) + case peerID = <-m.nodes.next(ctx): + return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.nodes.len(), time.Since(start)) case <-ctx.Done(): return "", nil, ctx.Err() } } -// UpdateFullNodePool is called by discovery when new full node is discovered or removed -func (m *Manager) UpdateFullNodePool(peerID peer.ID, isAdded bool) { +// UpdateNodePool is called by discovery when new node is discovered or removed. +func (m *Manager) UpdateNodePool(peerID peer.ID, isAdded bool) { if isAdded { if m.isBlacklistedPeer(peerID) { log.Debugw("got blacklisted peer from discovery", "peer", peerID.String()) return } - m.fullNodes.add(peerID) - log.Debugw("added to full nodes", "peer", peerID) + m.nodes.add(peerID) + log.Debugw("added to discovered nodes pool", "peer", peerID) return } - log.Debugw("removing peer from discovered full nodes", "peer", peerID.String()) - m.fullNodes.remove(peerID) + log.Debugw("removing peer from discovered nodes pool", "peer", peerID.String()) + m.nodes.remove(peerID) } func (m *Manager) newPeer( @@ -273,7 +273,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS case ResultNoop: case ResultCooldownPeer: if source == sourceFullNodes { - m.fullNodes.putOnCooldown(peerID) + m.nodes.putOnCooldown(peerID) return } m.getPool(datahash.String()).putOnCooldown(peerID) @@ -311,7 +311,7 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri } // subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected -// peers from full nodes pool +// peers from nodes pool func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) { defer close(m.disconnectedPeersDone) defer sub.Close() @@ -324,13 +324,14 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs log.Fatal("Subscription for connectedness events is closed.") //nolint:gocritic return } - // listen to disconnect event to remove peer from full nodes pool + // listen to disconnect event to remove peer from nodes pool connStatus := e.(event.EvtPeerConnectednessChanged) if connStatus.Connectedness == network.NotConnected { peer := connStatus.Peer - if m.fullNodes.has(peer) { - log.Debugw("peer disconnected, removing from full nodes", "peer", peer.String()) - m.fullNodes.remove(peer) + if m.nodes.has(peer) { + log.Debugw("peer disconnected, removing from discovered nodes pool", + "peer", peer.String()) + m.nodes.remove(peer) } } } @@ -368,8 +369,8 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p.add(peerID) if p.isValidatedDataHash.Load() { - // add peer to full nodes pool only if datahash has been already validated - m.fullNodes.add(peerID) + // add peer to discovered nodes pool only if datahash has been already validated + m.nodes.add(peerID) } return pubsub.ValidationIgnore } @@ -408,7 +409,7 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID) continue } - m.fullNodes.remove(peerID) + m.nodes.remove(peerID) // add peer to the blacklist, so we can't connect to it in the future. err := m.connGater.BlockPeer(peerID) if err != nil { @@ -436,15 +437,15 @@ func (m *Manager) validatedPool(hashStr string, height uint64) *syncPool { p := m.getOrCreatePool(hashStr, height) if p.isValidatedDataHash.CompareAndSwap(false, true) { log.Debugw("pool marked validated", "datahash", hashStr) - // if pool is proven to be valid, add all collected peers to full nodes - m.fullNodes.add(p.peers()...) + // if pool is proven to be valid, add all collected peers to discovered nodes + m.nodes.add(p.peers()...) } return p } // removeIfUnreachable removes peer from some pool if it is blacklisted or disconnected func (m *Manager) removeIfUnreachable(pool *syncPool, peerID peer.ID) bool { - if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { + if m.isBlacklistedPeer(peerID) || !m.nodes.has(peerID) { log.Debugw("removing outdated peer from pool", "peer", peerID.String()) pool.remove(peerID) return true diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index d4a188ff56..52a6fd7302 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -133,7 +133,7 @@ func TestManager(t *testing.T) { // create validated pool validDataHash := share.DataHash("datahash2") - manager.fullNodes.add("full") // add FN to unblock Peer call + manager.nodes.add("full") // add FN to unblock Peer call manager.Peer(ctx, validDataHash, h.Height()) //nolint:errcheck require.Len(t, manager.pools, 3) @@ -166,7 +166,7 @@ func TestManager(t *testing.T) { // add peers to fullnodes, imitating discovery add peers := []peer.ID{"peer1", "peer2", "peer3"} - manager.fullNodes.add(peers...) + manager.nodes.add(peers...) peerID, _, err := manager.Peer(ctx, h.DataHash.Bytes(), h.Height()) require.NoError(t, err) @@ -205,7 +205,7 @@ func TestManager(t *testing.T) { }() // send peers - manager.fullNodes.add(peers...) + manager.nodes.add(peers...) // wait for peer to be received select { @@ -434,7 +434,7 @@ func TestIntegration(t *testing.T) { fnHost, routingdisc.NewRoutingDiscovery(fnRouter), fullNodesTag, - discovery.WithOnPeersUpdate(fnPeerManager.UpdateFullNodePool), + discovery.WithOnPeersUpdate(fnPeerManager.UpdateNodePool), discovery.WithOnPeersUpdate(checkDiscoveredPeer), ) require.NoError(t, fnDisc.Start(ctx)) @@ -450,7 +450,7 @@ func TestIntegration(t *testing.T) { select { case <-waitCh: - require.Contains(t, fnPeerManager.fullNodes.peersList, bnHost.ID()) + require.Contains(t, fnPeerManager.nodes.peersList, bnHost.ID()) case <-ctx.Done(): require.NoError(t, ctx.Err()) } diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index 098610c595..eb42254430 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -137,10 +137,10 @@ func initMetrics(manager *Manager) (*metrics, error) { attribute.String(poolStatusKey, string(poolStatus)))) } - observer.ObserveInt64(fullNodesPool, int64(manager.fullNodes.len()), + observer.ObserveInt64(fullNodesPool, int64(manager.nodes.len()), metric.WithAttributes( attribute.String(peerStatusKey, string(peerStatusActive)))) - observer.ObserveInt64(fullNodesPool, int64(manager.fullNodes.cooldown.len()), + observer.ObserveInt64(fullNodesPool, int64(manager.nodes.cooldown.len()), metric.WithAttributes( attribute.String(peerStatusKey, string(peerStatusCooldown))))