Skip to content

Commit

Permalink
feat(share/p2p/peer-manager): limit amount of stored pools in peer-ma…
Browse files Browse the repository at this point in the history
…nager (celestiaorg#3005)

This PR introduces `amountOfStoredPools` param to Peer manager, that
will limit amount of stored pools. Defautlt value is 10 pools. Peer
manager will try to keep `amountOfStoredPools` amount of pools only for
recent headers. Older blocks don't need to be routed by manual pools and
could be routed to any full node, as those are expected to have the
block by that time.

This will lower memory footprint of peer manager as well as resolve
memory leaking issues.

Resolves celestiaorg#1781
  • Loading branch information
walldiss authored Dec 18, 2023
1 parent 7f51c59 commit c34b741
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 144 deletions.
15 changes: 7 additions & 8 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader
utils.SetStatusAndEnd(span, err)
}()

dah := header.DAH
// short circuit if the data root is empty
if dah.Equals(share.EmptyRoot()) {
if header.DAH.Equals(share.EmptyRoot()) {
return share.EmptyExtendedDataSquare(), nil
}
for {
Expand All @@ -147,10 +146,10 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader
}
attempt++
start := time.Now()
peer, setStatus, getErr := sg.peerManager.Peer(ctx, dah.Hash())
peer, setStatus, getErr := sg.peerManager.Peer(ctx, header.DAH.Hash(), header.Height())
if getErr != nil {
log.Debugw("eds: couldn't find peer",
"hash", dah.String(),
"hash", header.DAH.String(),
"err", getErr,
"finished (s)", time.Since(start))
sg.metrics.recordEDSAttempt(ctx, attempt, false)
Expand All @@ -159,11 +158,11 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader

reqStart := time.Now()
reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout)
eds, getErr := sg.edsClient.RequestEDS(reqCtx, dah.Hash(), peer)
eds, getErr := sg.edsClient.RequestEDS(reqCtx, header.DAH.Hash(), peer)
cancel()
switch {
case getErr == nil:
setStatus(peers.ResultSynced)
setStatus(peers.ResultNoop)
sg.metrics.recordEDSAttempt(ctx, attempt, true)
return eds, nil
case errors.Is(getErr, context.DeadlineExceeded),
Expand All @@ -182,7 +181,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader
err = errors.Join(err, getErr)
}
log.Debugw("eds: request failed",
"hash", dah.String(),
"hash", header.DAH.String(),
"peer", peer.String(),
"attempt", attempt,
"err", getErr,
Expand Down Expand Up @@ -223,7 +222,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
}
attempt++
start := time.Now()
peer, setStatus, getErr := sg.peerManager.Peer(ctx, dah.Hash())
peer, setStatus, getErr := sg.peerManager.Peer(ctx, header.DAH.Hash(), header.Height())
if getErr != nil {
log.Debugw("nd: couldn't find peer",
"hash", dah.String(),
Expand Down
103 changes: 50 additions & 53 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
const (
// ResultNoop indicates operation was successful and no extra action is required
ResultNoop result = "result_noop"
// ResultSynced will save the status of pool as "synced" and will remove peers from it
ResultSynced = "result_synced"
// ResultCooldownPeer will put returned peer on cooldown, meaning it won't be available by Peer
// method for some time
ResultCooldownPeer = "result_cooldown_peer"
Expand All @@ -39,6 +37,9 @@ const (
// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p
eventbusBufSize = 32

// storedPoolsAmount is the amount of pools for recent headers that will be stored in the peer manager
storedPoolsAmount = 10
)

type result string
Expand All @@ -56,11 +57,14 @@ type Manager struct {
host host.Host
connGater *conngater.BasicConnectionGater

// pools collecting peers from shrexSub
// pools collecting peers from shrexSub and stores them by datahash
pools map[string]*syncPool
// messages from shrex.Sub with height below initialHeight will be ignored, since we don't need to
// track peers for those headers

// initialHeight is the height of the first header received from headersub
initialHeight atomic.Uint64
// messages from shrex.Sub with height below storeFrom will be ignored, since we don't need to
// track peers for those headers
storeFrom atomic.Uint64

// fullNodes collects full nodes peer.ID found via discovery
fullNodes *pool
Expand All @@ -85,11 +89,8 @@ type syncPool struct {
// isValidatedDataHash indicates if datahash was validated by receiving corresponding extended
// header from headerSub
isValidatedDataHash atomic.Bool
// headerHeight is the height of header corresponding to syncpool
headerHeight atomic.Uint64
// isSynced will be true if DoneFunc was called with ResultSynced. It indicates that given datahash
// was synced and peer-manager no longer need to keep peers for it
isSynced atomic.Bool
// height is the height of the header that corresponds to datahash
height uint64
// createdAt is the syncPool creation time
createdAt time.Time
}
Expand Down Expand Up @@ -190,16 +191,15 @@ func (m *Manager) Stop(ctx context.Context) error {
// full nodes, it will wait until any peer appear in either source or timeout happen.
// After fetching data using given peer, caller is required to call returned DoneFunc using
// appropriate result value
func (m *Manager) Peer(
ctx context.Context, datahash share.DataHash,
func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint64,
) (peer.ID, DoneFunc, error) {
p := m.validatedPool(datahash.String())
p := m.validatedPool(datahash.String(), height)

// first, check if a peer is available for the given datahash
peerID, ok := p.tryGet()
if ok {
if m.removeIfUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
return m.Peer(ctx, datahash, height)
}
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0)
}
Expand All @@ -216,7 +216,7 @@ func (m *Manager) Peer(
select {
case peerID = <-p.next(ctx):
if m.removeIfUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
return m.Peer(ctx, datahash, height)
}
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
case peerID = <-m.fullNodes.next(ctx):
Expand Down Expand Up @@ -270,14 +270,12 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS
m.metrics.observeDoneResult(source, result)
switch result {
case ResultNoop:
case ResultSynced:
m.markPoolAsSynced(datahash.String())
case ResultCooldownPeer:
if source == sourceFullNodes {
m.fullNodes.putOnCooldown(peerID)
return
}
m.getOrCreatePool(datahash.String()).putOnCooldown(peerID)
m.getPool(datahash.String()).putOnCooldown(peerID)
case ResultBlacklistPeer:
m.blacklistPeers(reasonMisbehave, peerID)
}
Expand All @@ -298,12 +296,16 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
log.Errorw("get next header from sub", "err", err)
continue
}
m.validatedPool(h.DataHash.String())
m.validatedPool(h.DataHash.String(), h.Height())

// store first header for validation purposes
if m.initialHeight.CompareAndSwap(0, h.Height()) {
log.Debugw("stored initial height", "height", h.Height())
}

// update storeFrom if header heigh
m.storeFrom.Store(uint64(max(0, int(h.Height())-storedPoolsAmount)))
log.Debugw("updated lowest stored height", "height", h.Height())
}
}

Expand Down Expand Up @@ -355,22 +357,12 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif
return pubsub.ValidationReject
}

if msg.Height == 0 {
logger.Debug("received message with 0 height")
return pubsub.ValidationReject
}

if msg.Height < m.initialHeight.Load() {
// we can use peers from discovery for headers before the first one from headerSub
// if we allow pool creation for those headers, there is chance the pool will not be validated in
// time and will be false-positively trigger blacklisting of hash and all peers that sent msgs for
// that hash
if msg.Height < m.storeFrom.Load() {
logger.Debug("received message for past header")
return pubsub.ValidationIgnore
}

p := m.getOrCreatePool(msg.DataHash.String())
p.headerHeight.Store(msg.Height)
p := m.getOrCreatePool(msg.DataHash.String(), msg.Height)
logger.Debugw("got hash from shrex-sub")

p.add(peerID)
Expand All @@ -381,13 +373,20 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif
return pubsub.ValidationIgnore
}

func (m *Manager) getOrCreatePool(datahash string) *syncPool {
func (m *Manager) getPool(datahash string) *syncPool {
m.lock.Lock()
defer m.lock.Unlock()
return m.pools[datahash]
}

func (m *Manager) getOrCreatePool(datahash string, height uint64) *syncPool {
m.lock.Lock()
defer m.lock.Unlock()

p, ok := m.pools[datahash]
if !ok {
p = &syncPool{
height: height,
pool: newPool(m.params.PeerCooldown),
createdAt: time.Now(),
}
Expand Down Expand Up @@ -432,8 +431,8 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool {
return m.blacklistedHashes[hash.String()]
}

func (m *Manager) validatedPool(hashStr string) *syncPool {
p := m.getOrCreatePool(hashStr)
func (m *Manager) validatedPool(hashStr string, height uint64) *syncPool {
p := m.getOrCreatePool(hashStr, height)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "datahash", hashStr)
// if pool is proven to be valid, add all collected peers to full nodes
Expand Down Expand Up @@ -482,12 +481,24 @@ func (m *Manager) cleanUp() []peer.ID {

addToBlackList := make(map[peer.ID]struct{})
for h, p := range m.pools {
if !p.isValidatedDataHash.Load() && time.Since(p.createdAt) > m.params.PoolValidationTimeout {
delete(m.pools, h)
if p.headerHeight.Load() < m.initialHeight.Load() {
// outdated pools could still be valid even if not validated, no need to blacklist
continue
if p.isValidatedDataHash.Load() {
// remove pools that are outdated
if p.height < m.storeFrom.Load() {
delete(m.pools, h)
}
continue
}

// can't validate datahashes below initial height
if p.height < m.initialHeight.Load() {
delete(m.pools, h)
continue
}

// find pools that are not validated in time
if time.Since(p.createdAt) > m.params.PoolValidationTimeout {
delete(m.pools, h)

log.Debug("blacklisting datahash with all corresponding peers",
"hash", h,
"peer_list", p.peersList)
Expand All @@ -507,17 +518,3 @@ func (m *Manager) cleanUp() []peer.ID {
}
return blacklist
}

func (m *Manager) markPoolAsSynced(datahash string) {
p := m.getOrCreatePool(datahash)
if p.isSynced.CompareAndSwap(false, true) {
p.isSynced.Store(true)
p.reset()
}
}

func (p *syncPool) add(peers ...peer.ID) {
if !p.isSynced.Load() {
p.pool.add(peers...)
}
}
Loading

0 comments on commit c34b741

Please sign in to comment.