From c34b741ae3d70e31dcf7865ba959e1b039c0f076 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Mon, 18 Dec 2023 16:08:52 +0300 Subject: [PATCH] feat(share/p2p/peer-manager): limit amount of stored pools in peer-manager (#3005) This PR introduces `amountOfStoredPools` param to Peer manager, that will limit amount of stored pools. Defautlt value is 10 pools. Peer manager will try to keep `amountOfStoredPools` amount of pools only for recent headers. Older blocks don't need to be routed by manual pools and could be routed to any full node, as those are expected to have the block by that time. This will lower memory footprint of peer manager as well as resolve memory leaking issues. Resolves https://github.com/celestiaorg/celestia-node/issues/1781 --- share/getters/shrex.go | 15 ++-- share/p2p/peers/manager.go | 103 +++++++++++++------------ share/p2p/peers/manager_test.go | 129 ++++++++++++++++---------------- share/p2p/peers/metrics.go | 10 +-- share/p2p/peers/pool.go | 9 --- 5 files changed, 122 insertions(+), 144 deletions(-) diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 0586826e22..af48a7f8ab 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -135,9 +135,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader utils.SetStatusAndEnd(span, err) }() - dah := header.DAH // short circuit if the data root is empty - if dah.Equals(share.EmptyRoot()) { + if header.DAH.Equals(share.EmptyRoot()) { return share.EmptyExtendedDataSquare(), nil } for { @@ -147,10 +146,10 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader } attempt++ start := time.Now() - peer, setStatus, getErr := sg.peerManager.Peer(ctx, dah.Hash()) + peer, setStatus, getErr := sg.peerManager.Peer(ctx, header.DAH.Hash(), header.Height()) if getErr != nil { log.Debugw("eds: couldn't find peer", - "hash", dah.String(), + "hash", header.DAH.String(), "err", getErr, "finished (s)", time.Since(start)) sg.metrics.recordEDSAttempt(ctx, attempt, false) @@ -159,11 +158,11 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader reqStart := time.Now() reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout) - eds, getErr := sg.edsClient.RequestEDS(reqCtx, dah.Hash(), peer) + eds, getErr := sg.edsClient.RequestEDS(reqCtx, header.DAH.Hash(), peer) cancel() switch { case getErr == nil: - setStatus(peers.ResultSynced) + setStatus(peers.ResultNoop) sg.metrics.recordEDSAttempt(ctx, attempt, true) return eds, nil case errors.Is(getErr, context.DeadlineExceeded), @@ -182,7 +181,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader err = errors.Join(err, getErr) } log.Debugw("eds: request failed", - "hash", dah.String(), + "hash", header.DAH.String(), "peer", peer.String(), "attempt", attempt, "err", getErr, @@ -223,7 +222,7 @@ func (sg *ShrexGetter) GetSharesByNamespace( } attempt++ start := time.Now() - peer, setStatus, getErr := sg.peerManager.Peer(ctx, dah.Hash()) + peer, setStatus, getErr := sg.peerManager.Peer(ctx, header.DAH.Hash(), header.Height()) if getErr != nil { log.Debugw("nd: couldn't find peer", "hash", dah.String(), diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index e39a181150..23fa18bcb2 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -27,8 +27,6 @@ import ( const ( // ResultNoop indicates operation was successful and no extra action is required ResultNoop result = "result_noop" - // ResultSynced will save the status of pool as "synced" and will remove peers from it - ResultSynced = "result_synced" // ResultCooldownPeer will put returned peer on cooldown, meaning it won't be available by Peer // method for some time ResultCooldownPeer = "result_cooldown_peer" @@ -39,6 +37,9 @@ const ( // eventbusBufSize is the size of the buffered channel to handle // events in libp2p eventbusBufSize = 32 + + // storedPoolsAmount is the amount of pools for recent headers that will be stored in the peer manager + storedPoolsAmount = 10 ) type result string @@ -56,11 +57,14 @@ type Manager struct { host host.Host connGater *conngater.BasicConnectionGater - // pools collecting peers from shrexSub + // pools collecting peers from shrexSub and stores them by datahash pools map[string]*syncPool - // messages from shrex.Sub with height below initialHeight will be ignored, since we don't need to - // track peers for those headers + + // initialHeight is the height of the first header received from headersub initialHeight atomic.Uint64 + // messages from shrex.Sub with height below storeFrom will be ignored, since we don't need to + // track peers for those headers + storeFrom atomic.Uint64 // fullNodes collects full nodes peer.ID found via discovery fullNodes *pool @@ -85,11 +89,8 @@ type syncPool struct { // isValidatedDataHash indicates if datahash was validated by receiving corresponding extended // header from headerSub isValidatedDataHash atomic.Bool - // headerHeight is the height of header corresponding to syncpool - headerHeight atomic.Uint64 - // isSynced will be true if DoneFunc was called with ResultSynced. It indicates that given datahash - // was synced and peer-manager no longer need to keep peers for it - isSynced atomic.Bool + // height is the height of the header that corresponds to datahash + height uint64 // createdAt is the syncPool creation time createdAt time.Time } @@ -190,16 +191,15 @@ func (m *Manager) Stop(ctx context.Context) error { // full nodes, it will wait until any peer appear in either source or timeout happen. // After fetching data using given peer, caller is required to call returned DoneFunc using // appropriate result value -func (m *Manager) Peer( - ctx context.Context, datahash share.DataHash, +func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint64, ) (peer.ID, DoneFunc, error) { - p := m.validatedPool(datahash.String()) + p := m.validatedPool(datahash.String(), height) // first, check if a peer is available for the given datahash peerID, ok := p.tryGet() if ok { if m.removeIfUnreachable(p, peerID) { - return m.Peer(ctx, datahash) + return m.Peer(ctx, datahash, height) } return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) } @@ -216,7 +216,7 @@ func (m *Manager) Peer( select { case peerID = <-p.next(ctx): if m.removeIfUnreachable(p, peerID) { - return m.Peer(ctx, datahash) + return m.Peer(ctx, datahash, height) } return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): @@ -270,14 +270,12 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS m.metrics.observeDoneResult(source, result) switch result { case ResultNoop: - case ResultSynced: - m.markPoolAsSynced(datahash.String()) case ResultCooldownPeer: if source == sourceFullNodes { m.fullNodes.putOnCooldown(peerID) return } - m.getOrCreatePool(datahash.String()).putOnCooldown(peerID) + m.getPool(datahash.String()).putOnCooldown(peerID) case ResultBlacklistPeer: m.blacklistPeers(reasonMisbehave, peerID) } @@ -298,12 +296,16 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri log.Errorw("get next header from sub", "err", err) continue } - m.validatedPool(h.DataHash.String()) + m.validatedPool(h.DataHash.String(), h.Height()) // store first header for validation purposes if m.initialHeight.CompareAndSwap(0, h.Height()) { log.Debugw("stored initial height", "height", h.Height()) } + + // update storeFrom if header heigh + m.storeFrom.Store(uint64(max(0, int(h.Height())-storedPoolsAmount))) + log.Debugw("updated lowest stored height", "height", h.Height()) } } @@ -355,22 +357,12 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif return pubsub.ValidationReject } - if msg.Height == 0 { - logger.Debug("received message with 0 height") - return pubsub.ValidationReject - } - - if msg.Height < m.initialHeight.Load() { - // we can use peers from discovery for headers before the first one from headerSub - // if we allow pool creation for those headers, there is chance the pool will not be validated in - // time and will be false-positively trigger blacklisting of hash and all peers that sent msgs for - // that hash + if msg.Height < m.storeFrom.Load() { logger.Debug("received message for past header") return pubsub.ValidationIgnore } - p := m.getOrCreatePool(msg.DataHash.String()) - p.headerHeight.Store(msg.Height) + p := m.getOrCreatePool(msg.DataHash.String(), msg.Height) logger.Debugw("got hash from shrex-sub") p.add(peerID) @@ -381,13 +373,20 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif return pubsub.ValidationIgnore } -func (m *Manager) getOrCreatePool(datahash string) *syncPool { +func (m *Manager) getPool(datahash string) *syncPool { + m.lock.Lock() + defer m.lock.Unlock() + return m.pools[datahash] +} + +func (m *Manager) getOrCreatePool(datahash string, height uint64) *syncPool { m.lock.Lock() defer m.lock.Unlock() p, ok := m.pools[datahash] if !ok { p = &syncPool{ + height: height, pool: newPool(m.params.PeerCooldown), createdAt: time.Now(), } @@ -432,8 +431,8 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool { return m.blacklistedHashes[hash.String()] } -func (m *Manager) validatedPool(hashStr string) *syncPool { - p := m.getOrCreatePool(hashStr) +func (m *Manager) validatedPool(hashStr string, height uint64) *syncPool { + p := m.getOrCreatePool(hashStr, height) if p.isValidatedDataHash.CompareAndSwap(false, true) { log.Debugw("pool marked validated", "datahash", hashStr) // if pool is proven to be valid, add all collected peers to full nodes @@ -482,12 +481,24 @@ func (m *Manager) cleanUp() []peer.ID { addToBlackList := make(map[peer.ID]struct{}) for h, p := range m.pools { - if !p.isValidatedDataHash.Load() && time.Since(p.createdAt) > m.params.PoolValidationTimeout { - delete(m.pools, h) - if p.headerHeight.Load() < m.initialHeight.Load() { - // outdated pools could still be valid even if not validated, no need to blacklist - continue + if p.isValidatedDataHash.Load() { + // remove pools that are outdated + if p.height < m.storeFrom.Load() { + delete(m.pools, h) } + continue + } + + // can't validate datahashes below initial height + if p.height < m.initialHeight.Load() { + delete(m.pools, h) + continue + } + + // find pools that are not validated in time + if time.Since(p.createdAt) > m.params.PoolValidationTimeout { + delete(m.pools, h) + log.Debug("blacklisting datahash with all corresponding peers", "hash", h, "peer_list", p.peersList) @@ -507,17 +518,3 @@ func (m *Manager) cleanUp() []peer.ID { } return blacklist } - -func (m *Manager) markPoolAsSynced(datahash string) { - p := m.getOrCreatePool(datahash) - if p.isSynced.CompareAndSwap(false, true) { - p.isSynced.Store(true) - p.reset() - } -} - -func (p *syncPool) add(peers ...peer.ID) { - if !p.isSynced.Load() { - p.pool.add(peers...) - } -} diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 94ec5d5ea2..d4a188ff56 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -2,12 +2,12 @@ package peers import ( "context" - sync2 "sync" + "sync" "testing" "time" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" + dssync "github.com/ipfs/go-datastore/sync" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" @@ -26,10 +26,9 @@ import ( "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) -// TODO: add broadcast to tests func TestManager(t *testing.T) { t.Run("Validate pool by headerSub", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) // create headerSub mock @@ -64,21 +63,16 @@ func TestManager(t *testing.T) { result := manager.Validate(ctx, peerID, msg) require.Equal(t, pubsub.ValidationIgnore, result) - pID, done, err := manager.Peer(ctx, h.DataHash.Bytes()) + pID, _, err := manager.Peer(ctx, h.DataHash.Bytes(), h.Height()) require.NoError(t, err) require.Equal(t, peerID, pID) // check pool validation - require.True(t, manager.getOrCreatePool(h.DataHash.String()).isValidatedDataHash.Load()) - - done(ResultSynced) - // pool should not be removed after success - require.Len(t, manager.pools, 1) - require.Len(t, manager.getOrCreatePool(h.DataHash.String()).pool.peersList, 0) + require.True(t, manager.getPool(h.DataHash.String()).isValidatedDataHash.Load()) }) t.Run("validator", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) // create headerSub mock @@ -100,7 +94,7 @@ func TestManager(t *testing.T) { require.Equal(t, pubsub.ValidationIgnore, result) // mark peer as misbehaved to blacklist it - pID, done, err := manager.Peer(ctx, h.DataHash.Bytes()) + pID, done, err := manager.Peer(ctx, h.DataHash.Bytes(), h.Height()) require.NoError(t, err) require.Equal(t, peerID, pID) manager.params.EnableBlackListing = true @@ -132,19 +126,21 @@ func TestManager(t *testing.T) { // create unvalidated pool peerID := peer.ID("peer1") msg := shrexsub.Notification{ - DataHash: share.DataHash("datahash1"), + DataHash: share.DataHash("datahash1datahash1datahash1datahash1datahash1"), Height: 2, } manager.Validate(ctx, peerID, msg) // create validated pool validDataHash := share.DataHash("datahash2") - manager.fullNodes.add("full") // add FN to unblock Peer call - manager.Peer(ctx, validDataHash) //nolint:errcheck + manager.fullNodes.add("full") // add FN to unblock Peer call + manager.Peer(ctx, validDataHash, h.Height()) //nolint:errcheck + require.Len(t, manager.pools, 3) // trigger cleanup blacklisted := manager.cleanUp() require.Contains(t, blacklisted, peerID) + require.Len(t, manager.pools, 2) // messages with blacklisted hash should be rejected right away peerID2 := peer.ID("peer2") @@ -172,8 +168,7 @@ func TestManager(t *testing.T) { peers := []peer.ID{"peer1", "peer2", "peer3"} manager.fullNodes.add(peers...) - peerID, done, err := manager.Peer(ctx, h.DataHash.Bytes()) - done(ResultSynced) + peerID, _, err := manager.Peer(ctx, h.DataHash.Bytes(), h.Height()) require.NoError(t, err) require.Contains(t, peers, peerID) @@ -195,7 +190,7 @@ func TestManager(t *testing.T) { // make sure peers are not returned before timeout timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond) t.Cleanup(cancel) - _, _, err = manager.Peer(timeoutCtx, h.DataHash.Bytes()) + _, _, err = manager.Peer(timeoutCtx, h.DataHash.Bytes(), h.Height()) require.ErrorIs(t, err, context.DeadlineExceeded) peers := []peer.ID{"peer1", "peer2", "peer3"} @@ -204,8 +199,7 @@ func TestManager(t *testing.T) { doneCh := make(chan struct{}) go func() { defer close(doneCh) - peerID, done, err := manager.Peer(ctx, h.DataHash.Bytes()) - done(ResultSynced) + peerID, _, err := manager.Peer(ctx, h.DataHash.Bytes(), h.Height()) require.NoError(t, err) require.Contains(t, peers, peerID) }() @@ -223,38 +217,7 @@ func TestManager(t *testing.T) { stopManager(t, manager) }) - t.Run("mark pool synced", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - h := testHeader() - headerSub := newSubLock(h, nil) - - // start test manager - manager, err := testManager(ctx, headerSub) - require.NoError(t, err) - - peerID, msg := peer.ID("peer1"), newShrexSubMsg(h) - result := manager.Validate(ctx, peerID, msg) - require.Equal(t, pubsub.ValidationIgnore, result) - - pID, done, err := manager.Peer(ctx, h.DataHash.Bytes()) - require.NoError(t, err) - require.Equal(t, peerID, pID) - done(ResultSynced) - - // check pool is soft deleted and marked synced - pool := manager.getOrCreatePool(h.DataHash.String()) - require.Len(t, pool.peersList, 0) - require.True(t, pool.isSynced.Load()) - - // add peer on synced pool should be noop - result = manager.Validate(ctx, "peer2", msg) - require.Equal(t, pubsub.ValidationIgnore, result) - require.Len(t, pool.peersList, 0) - }) - - t.Run("shrexSub sends a message lower than first headerSub header height, msg first", func(t *testing.T) { + t.Run("shrexSub sends a message lower than first headerSub header height, headerSub first", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -278,12 +241,16 @@ func TestManager(t *testing.T) { } result := manager.Validate(ctx, "peer", msg) require.Equal(t, pubsub.ValidationIgnore, result) + // pool will be created for first shrexSub message + require.Len(t, manager.pools, 2) - // amount of pools should not change + blacklisted := manager.cleanUp() + require.Empty(t, blacklisted) + // trigger cleanup and outdated pool should be removed require.Len(t, manager.pools, 1) }) - t.Run("shrexSub sends a message lower than first headerSub header height, headerSub first", func(t *testing.T) { + t.Run("shrexSub sends a message lower than first headerSub header height, shrexSub first", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -303,18 +270,50 @@ func TestManager(t *testing.T) { result := manager.Validate(ctx, "peer", msg) require.Equal(t, pubsub.ValidationIgnore, result) - // unlock header sub after message validator + // pool will be created for first shrexSub message + require.Len(t, manager.pools, 1) + + // unlock headerSub to allow it to send next message require.NoError(t, headerSub.wait(ctx, 1)) - // pool will be created for first headerSub header datahash + // second pool should be created require.Len(t, manager.pools, 2) - // trigger cleanup and check that no peers or hashes were blacklisted - manager.params.PoolValidationTimeout = 0 + // trigger cleanup and outdated pool should be removed blacklisted := manager.cleanUp() + require.Len(t, manager.pools, 1) + + // check that no peers or hashes were blacklisted + manager.params.PoolValidationTimeout = 0 require.Len(t, blacklisted, 0) require.Len(t, manager.blacklistedHashes, 0) + }) + + t.Run("pools store window", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + h := testHeader() + h.RawHeader.Height = storedPoolsAmount * 2 + headerSub := newSubLock(h, nil) + + // start test manager + manager, err := testManager(ctx, headerSub) + require.NoError(t, err) + + // unlock headerSub to read first header + require.NoError(t, headerSub.wait(ctx, 1)) + // pool will be created for first headerSub header datahash + require.Len(t, manager.pools, 1) + + // create shrexSub msg with height lower than storedPoolsAmount + msg := shrexsub.Notification{ + DataHash: share.DataHash("datahash"), + Height: h.Height() - storedPoolsAmount - 3, + } + result := manager.Validate(ctx, "peer", msg) + require.Equal(t, pubsub.ValidationIgnore, result) - // outdated pool should be removed + // shrexSub message should be discarded and amount of pools should not change require.Len(t, manager.pools, 1) }) } @@ -355,7 +354,7 @@ func TestIntegration(t *testing.T) { })) // FN should get message - gotPeer, _, err := fnPeerManager.Peer(ctx, randHash) + gotPeer, _, err := fnPeerManager.Peer(ctx, randHash, 13) require.NoError(t, err) // check that gotPeer matched bridge node @@ -409,7 +408,7 @@ func TestIntegration(t *testing.T) { require.NoError(t, err) // init peer manager for full node - connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) + connGater, err := conngater.NewBasicConnectionGater(dssync.MutexWrap(datastore.NewMapDatastore())) require.NoError(t, err) fnPeerManager, err := NewManager( DefaultParameters(), @@ -469,7 +468,7 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten return nil, err } - connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) + connGater, err := conngater.NewBasicConnectionGater(dssync.MutexWrap(datastore.NewMapDatastore())) if err != nil { return nil, err } @@ -504,7 +503,7 @@ func testHeader() *header.ExtendedHeader { type subLock struct { next chan struct{} - wg *sync2.WaitGroup + wg *sync.WaitGroup expected []*header.ExtendedHeader } @@ -530,7 +529,7 @@ func (s subLock) release(ctx context.Context) error { } func newSubLock(expected ...*header.ExtendedHeader) *subLock { - wg := &sync2.WaitGroup{} + wg := &sync.WaitGroup{} wg.Add(1) return &subLock{ next: make(chan struct{}), diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index 95d1ce65d9..3b5913ebb8 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -39,14 +39,11 @@ const ( poolStatusKey = "pool_status" poolStatusCreated poolStatus = "created" poolStatusValidated poolStatus = "validated" - poolStatusSynced poolStatus = "synced" poolStatusBlacklisted poolStatus = "blacklisted" // Pool status model: // created(unvalidated) // / \ - // validated(unsynced) blacklisted - // | - // synced + // validated blacklisted ) var ( @@ -266,11 +263,6 @@ func (m *Manager) shrexPools() map[poolStatus]int64 { continue } - if p.isSynced.Load() { - shrexPools[poolStatusSynced]++ - continue - } - // pool is validated but not synced shrexPools[poolStatusValidated]++ } diff --git a/share/p2p/peers/pool.go b/share/p2p/peers/pool.go index d0cc45ac44..365ef0306d 100644 --- a/share/p2p/peers/pool.go +++ b/share/p2p/peers/pool.go @@ -224,12 +224,3 @@ func (p *pool) len() int { defer p.m.RUnlock() return p.activeCount } - -// reset will reset the pool to its initial state. -func (p *pool) reset() { - lock := &p.m - lock.Lock() - defer lock.Lock() - // swap the pool with an empty one - *p = *newPool(time.Second) -}