Skip to content

Commit

Permalink
Merge pull request #3952 from onflow/yahya/v0.29-backport-3904
Browse files Browse the repository at this point in the history
[Networking] Logging of libp2p Resource Limits at Node Startup
  • Loading branch information
peterargue authored Feb 21, 2023
2 parents ac3a8a9 + 04f1aba commit e644427
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
10 changes: 9 additions & 1 deletion network/p2p/p2pbuilder/libp2pNodeBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,18 @@ func (builder *LibP2PNodeBuilder) Build() (p2p.LibP2PNode, error) {
return nil, fmt.Errorf("could not get allowed file descriptors: %w", err)
}

mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(limits.Scale(mem, fd)), rcmgr.WithMetrics(builder.metrics))
l := limits.Scale(mem, fd)
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(l), rcmgr.WithMetrics(builder.metrics))
if err != nil {
return nil, fmt.Errorf("could not create libp2p resource manager: %w", err)
}
builder.logger.Info().
Str("key", keyResourceManagerLimit).
Int64("allowed_memory", mem).
Int("allowed_file_descriptors", fd).
Msg("allowed memory and file descriptors are fetched from the system")
newLimitConfigLogger(builder.logger).logResourceManagerLimits(l)

opts = append(opts, libp2p.ResourceManager(mgr))
builder.logger.Info().Msg("libp2p resource manager is set to default with metrics")
}
Expand Down
99 changes: 99 additions & 0 deletions network/p2p/p2pbuilder/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ import (
"fmt"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/p2p"
)

const keyResourceManagerLimit = "libp2p_resource_manager_limit"

// notEjectedPeerFilter returns a PeerFilter that will return an error if the peer is unknown or ejected.
func notEjectedPeerFilter(idProvider module.IdentityProvider) p2p.PeerFilter {
return func(p peer.ID) error {
Expand All @@ -21,3 +26,97 @@ func notEjectedPeerFilter(idProvider module.IdentityProvider) p2p.PeerFilter {
return nil
}
}

type limitConfigLogger struct {
logger zerolog.Logger
}

// newLimitConfigLogger creates a new limitConfigLogger.
func newLimitConfigLogger(logger zerolog.Logger) *limitConfigLogger {
return &limitConfigLogger{logger: logger}
}

// withBaseLimit appends the base limit to the logger with the given prefix.
func (l *limitConfigLogger) withBaseLimit(prefix string, baseLimit rcmgr.BaseLimit) zerolog.Logger {
return l.logger.With().
Str("key", keyResourceManagerLimit).
Int(fmt.Sprintf("%s_streams", prefix), baseLimit.Streams).
Int(fmt.Sprintf("%s_streams_inbound", prefix), baseLimit.StreamsInbound).
Int(fmt.Sprintf("%s_streams_outbound", prefix), baseLimit.StreamsOutbound).
Int(fmt.Sprintf("%s_conns", prefix), baseLimit.Conns).
Int(fmt.Sprintf("%s_conns_inbound", prefix), baseLimit.ConnsInbound).
Int(fmt.Sprintf("%s_conns_outbound", prefix), baseLimit.ConnsOutbound).
Int(fmt.Sprintf("%s_file_descriptors", prefix), baseLimit.FD).
Int64(fmt.Sprintf("%s_memory", prefix), baseLimit.Memory).Logger()
}

func (l *limitConfigLogger) logResourceManagerLimits(config rcmgr.LimitConfig) {
l.logGlobalResourceLimits(config)
l.logServiceLimits(config.Service)
l.logProtocolLimits(config.Protocol)
l.logPeerLimits(config.Peer)
l.logPeerProtocolLimits(config.ProtocolPeer)
}

func (l *limitConfigLogger) logGlobalResourceLimits(config rcmgr.LimitConfig) {
lg := l.withBaseLimit("system", config.System)
lg.Info().Msg("system limits set")

lg = l.withBaseLimit("transient", config.Transient)
lg.Info().Msg("transient limits set")

lg = l.withBaseLimit("allowed_listed_system", config.AllowlistedSystem)
lg.Info().Msg("allowed listed system limits set")

lg = l.withBaseLimit("allowed_lister_transient", config.AllowlistedTransient)
lg.Info().Msg("allowed listed transient limits set")

lg = l.withBaseLimit("service_default", config.ServiceDefault)
lg.Info().Msg("service default limits set")

lg = l.withBaseLimit("service_peer_default", config.ServicePeerDefault)
lg.Info().Msg("service peer default limits set")

lg = l.withBaseLimit("protocol_default", config.ProtocolDefault)
lg.Info().Msg("protocol default limits set")

lg = l.withBaseLimit("protocol_peer_default", config.ProtocolPeerDefault)
lg.Info().Msg("protocol peer default limits set")

lg = l.withBaseLimit("peer_default", config.PeerDefault)
lg.Info().Msg("peer default limits set")

lg = l.withBaseLimit("connections", config.Conn)
lg.Info().Msg("connection limits set")

lg = l.withBaseLimit("streams", config.Stream)
lg.Info().Msg("stream limits set")
}

func (l *limitConfigLogger) logServiceLimits(s map[string]rcmgr.BaseLimit) {
for sName, sLimits := range s {
lg := l.withBaseLimit(fmt.Sprintf("service_%s", sName), sLimits)
lg.Info().Msg("service limits set")
}
}

func (l *limitConfigLogger) logProtocolLimits(p map[protocol.ID]rcmgr.BaseLimit) {
for pName, pLimits := range p {
lg := l.withBaseLimit(fmt.Sprintf("protocol_%s", pName), pLimits)
lg.Info().Msg("protocol limits set")
}
}

func (l *limitConfigLogger) logPeerLimits(p map[peer.ID]rcmgr.BaseLimit) {
for pId, pLimits := range p {
lg := l.withBaseLimit(fmt.Sprintf("peer_%s", pId.String()), pLimits)
lg.Info().Msg("peer limits set")
}
}

func (l *limitConfigLogger) logPeerProtocolLimits(p map[protocol.ID]rcmgr.BaseLimit) {
for pName, pLimits := range p {
lg := l.withBaseLimit(fmt.Sprintf("protocol_peer_%s", pName), pLimits)
lg.Info().Msg("protocol peer limits set")
}
}

0 comments on commit e644427

Please sign in to comment.