Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix libp2p identify race #6573

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func NewFetch(
if host != nil {
connectedf := func(peer p2p.Peer) {
protocols := func() []protocol.ID {
// Make sure that the protocol list for the peer is correct.
// This is similar to what Host.NewStream does to make
// sure it is possible to use one of the specified
// protocols. If we don't do this, there may be a race causing
// some peers to be unnecessarily ignored.
host.Identify(peer)
Comment on lines +303 to +308
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this:

Suggested change
// Make sure that the protocol list for the peer is correct.
// This is similar to what Host.NewStream does to make
// sure it is possible to use one of the specified
// protocols. If we don't do this, there may be a race causing
// some peers to be unnecessarily ignored.
host.Identify(peer)
pi := host.Peerstore().PeerInfo(peer)
if err := host.Connect(context.Background(), pi); err != nil {
f.logger.Debug("failed to connect to peer",
zap.Stringer("id", peer),
zap.Error(err),
)
return nil
}

has basically the same effect without a) needing to intercept the building process of the libp2p host and b) allowing us to pass a timeout to Connect in case we want to abort early if something goes wrong.

Connect in both implementations of Host eventually calls IdentifyWait which is also called by IdentifyCon but allows passing a context in case we want to abort early.

Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work b/c Connect is noop in case if the peer is already connected:
https://github.com/libp2p/go-libp2p/blob/v0.38.1/p2p/host/basic/basic_host.go#L803
So it will not call IdentifyWait in this case (it is invoked from dialPeer: https://github.com/libp2p/go-libp2p/blob/v0.38.1/p2p/host/basic/basic_host.go#L826)
And if you force dial via a context option, there will be adverse side effects such as trying to establish a new connection, invoking the gater etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a way of trying to create a new stream which also calls IdentifyWait, but I do not like it either as it adds overhead of creating a new stream when it's not really needed, producing unwanted network traffic

Copy link
Member

@fasmat fasmat Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. For the network status to be connected doesn't this mean Connect has already been called and it is OK that Connect is a no-op? The first call to it will result in IdentifyWait being called?

Again to me this feels wrong - pulling internals out of libp2p that look like they might not be intended to be used outside of the library (IdentifyConn looks like a helper method for tests of the IDService)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The peer is listed in the addressbook before Connect finishes.
The need to access internals is rather unfortunate but it's due to some inconvenient decisions in go-libp2p codebase. Basically IDService can be easily accessed if you know for sure your Host is a *BasicHost, but it is harder to reach if there are some Host wrappers. So IDService is not fully internal.
IdentifyConn just calls IdentifyWait (which is not only used in tests) and waits on the channel it returns

ps, err := host.Peerstore().GetProtocols(peer)
if err != nil {
f.logger.Debug("failed to get protocols for peer",
Expand Down
10 changes: 7 additions & 3 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,16 +966,16 @@ func TestFetch_GetCert(t *testing.T) {

// Test if GetAtxs() limits the number of concurrent requests to `cfg.GetAtxsConcurrency`.
func Test_GetAtxsLimiting(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)

const (
totalRequests = 100
getAtxConcurrency = 10
)

for _, withLimiting := range []bool{false, true} {
t.Run(fmt.Sprintf("with limiting: %v", withLimiting), func(t *testing.T) {
// Do not connect immediately in order to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
srv := server.New(
wrapHost(mesh.Hosts()[1]),
hashProtocol,
Expand Down Expand Up @@ -1038,6 +1038,10 @@ func Test_GetAtxsLimiting(t *testing.T) {
require.NoError(t, f.Start())
t.Cleanup(f.Stop)

// Connect the P2P mesh only after the server is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

var atxIds []types.ATXID
for i := 0; i < totalRequests; i++ {
id := types.RandomATXID()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/zeebo/blake3 v0.2.4
go.uber.org/fx v1.23.0
go.uber.org/mock v0.5.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
Expand Down Expand Up @@ -231,7 +232,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/fx v1.23.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/mod v0.22.0 // indirect
Expand Down
13 changes: 13 additions & 0 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/security/noise"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -282,6 +284,8 @@
return nil, fmt.Errorf("can't set up connection gater: %w", err)
}

var identifyConn func(network.Conn)

pt := peerinfo.NewPeerInfoTracker()
lopts := []libp2p.Option{
libp2p.Identity(key),
Expand All @@ -295,6 +299,11 @@
cfg.AutoNATServer.PeerMax,
cfg.AutoNATServer.ResetPeriod),
libp2p.ConnectionGater(g),
libp2p.WithFxOption(fx.Invoke(func(ids identify.IDService) {
identifyConn = func(c network.Conn) {
ids.IdentifyConn(c)
}
})),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a hack to me, especially since later down we panic if this doesn't work.

Copy link
Contributor Author

@ivan4th ivan4th Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not fail unless fx DI library that libp2p uses is broken.
There's a panic call further down to clarify that in case if this doesn't work.
Will try to get rid of this function variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It now uses a variable of identity.IDService type and returns an error instead of panicking if for whatever reason fx fails to invoke the function

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still feels like we are digging deep through the layers of libp2p to access functionality that isn't intended to be used outside of libp2p. libp2p.WithFxOption is marked as experimental and fx itself is used for dependency injection / builder patterns in libp2p. libp2p.New probably returns an interface instead of a concrete type such that the maintainers can make big refactorings without breaking user code.

Us interjecting the build process of whatever is behind the interface returned by lip2p.New will probably only lead to us having to rewrite code with future updates of the library. I suggest a slightly different approach (see my other comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nature of libp2p is that the DI has to be used anyway, e.g. in all the cases like this

func(upgrader transport.Upgrader, rcmgr network.ResourceManager) (transport.Transport, error) {
we're using libp2p's fx DI b/c it passes the necessary arguments to the functions we provide.
It can also be possible to get hold on the *BasicHost via other DI tricks while not using WithFxOption but IMO that would be more hacky

}
if cfg.EnableTCPTransport {
lopts = append(lopts,
Expand Down Expand Up @@ -413,6 +422,9 @@
if err != nil {
return nil, fmt.Errorf("failed to initialize libp2p host: %w", err)
}
if identifyConn == nil {
panic("BUG: identify service not set")

Check warning on line 426 in p2p/host.go

View check run for this annotation

Codecov / codecov/patch

p2p/host.go#L426

Added line #L426 was not covered by tests
}
fasmat marked this conversation as resolved.
Show resolved Hide resolved
g.updateHost(h)
h.Network().Notify(p2pmetrics.NewConnectionsMeeter())
pt.Start(h.Network())
Expand All @@ -427,6 +439,7 @@
WithBootnodes(bootnodesMap),
WithDirectNodes(g.direct),
WithPeerInfo(pt),
WithIdentifyConn(identifyConn),
)
return Upgrade(h, opts...)
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestConnectedPersist(t *testing.T) {
dir := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
const n = 3
// We can use FullMeshConnected here b/c we don't need to query peers' protocols,
// and thus there are no issues with identify service race.
mock, err := mocknet.FullMeshConnected(n)
require.NoError(t, err)
var eg errgroup.Group
Expand Down Expand Up @@ -46,6 +48,8 @@ func TestConnectedBrokenCRC(t *testing.T) {
dir := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
const n = 3
// We can use FullMeshConnected here b/c we don't need to query peers' protocols,
// and thus there are no issues with identify service race.
mock, err := mocknet.FullMeshConnected(n)
require.NoError(t, err)
var eg errgroup.Group
Expand Down
36 changes: 33 additions & 3 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,30 @@ func wrapHost(tb testing.TB, h host.Host) Host {
func TestServer(t *testing.T) {
const limit = 1024

mesh, err := mocknet.FullMeshConnected(5)
// Don't establish peer connections immediately.
// If we make the nodes connect to each other right away, there
// may be a problem with libp2p identify service. Namely, when a
// connection to a peer is established, identify request is sent
// to that peer, to which the peer sends identify response. Also,
// when a new handler is added to the host via SetStreamHandler,
// as it's done in p2p/server.Server, the identify response
// message is pushed to the peers currently connected to this
// node.
// When a server is created immediately following a connection
// from a peer, that peer may receive 2 identify response messages
// in quick succession: one which is indeed the response to
// initial identify request, and one which is push message
// generated by creation of the server. In some cases, the initial
// response may not contain the protocol of the freshly added
// server (hashProtocol="hs/1" in this case).
// The identify response message sent when SetStreamHandler is
// called will always contain the protocol being added, but due to
// how libp2p works, the responses may be processed in reverse
// order, and the set of protocols from the initial message will
// override the set of protocols in the second mesage (from
fasmat marked this conversation as resolved.
Show resolved Hide resolved
// SetStreamHandler). In this case, the peer will have a wrong
// idea about this node's protocols, and the test may fail.
mesh, err := mocknet.FullMeshLinked(5)
fasmat marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
proto := "test"
request := []byte("test request")
Expand Down Expand Up @@ -121,6 +144,10 @@ func TestServer(t *testing.T) {
eg.Wait()
})

// Connect the P2P mesh only after the servers are configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

t.Run("ReceiveMessage", func(t *testing.T) {
n := srvs[0].NumAcceptedRequests()
srvID := mesh.Hosts()[1].ID()
Expand Down Expand Up @@ -212,7 +239,7 @@ func TestServer(t *testing.T) {
}

func Test_Queued(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

var (
Expand Down Expand Up @@ -245,6 +272,8 @@ func Test_Queued(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
require.NoError(t, mesh.ConnectAllButSelf())

var reqEq errgroup.Group
for i := 0; i < queueSize; i++ { // fill the queue with requests
reqEq.Go(func() error {
Expand All @@ -266,7 +295,7 @@ func Test_Queued(t *testing.T) {
}

func Test_RequestInterval(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

var (
Expand Down Expand Up @@ -295,6 +324,7 @@ func Test_RequestInterval(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
require.NoError(t, mesh.ConnectAllButSelf())

start := time.Now()
for i := 0; i < maxReq; i++ { // fill the interval with requests (bursts up to maxReq are allowed)
Expand Down
35 changes: 34 additions & 1 deletion p2p/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
Expand Down Expand Up @@ -75,6 +76,12 @@ func WithPeerInfo(pi peerinfo.PeerInfo) Opt {
}
}

func WithIdentifyConn(identifyConn func(network.Conn)) Opt {
return func(fh *Host) {
fh.identifyConn = identifyConn
}
}

// Host is a convenience wrapper for all p2p related functionality required to run
// a full spacemesh node.
type Host struct {
Expand Down Expand Up @@ -112,7 +119,8 @@ type Host struct {
value network.Reachability
}

ping *Ping
ping *Ping
identifyConn func(network.Conn)
}

// Upgrade creates Host instance from host.Host.
Expand All @@ -128,6 +136,15 @@ func Upgrade(h host.Host, opts ...Opt) (*Host, error) {
for _, opt := range opts {
opt(fh)
}
if fh.identifyConn == nil {
// If no IDService is provided, which may be the case in the tests,
// we can try to get it from the host, assuming it's a *basichost.BasicHost.
if bh, ok := h.(*basichost.BasicHost); ok {
fh.identifyConn = func(conn network.Conn) {
bh.IDService().IdentifyConn(conn)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this cast fails? If libp2p.New returns an interface - we shouldn't assume a specific implementation to be behind that interface or this might randomly break (silently) at some point in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some clarifying comments. *basichost.BasicHost is expected when libp2p's mocknet is being used, and there's actually no better way than casting host.Host to *basichost.BasicHost in this case to obtain IDService, as libp2p.New() and fx dependency injection is not used in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe instead of accessing libp2p internals we should just use the methods that are exposed via the Host interface. Connect establishes the connection to the peer and calls IDService:IdentifyWait() before returning. It would also allow us to pass a timeout if we want that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly there seems to be no other way, Connect does not do this if the peer is already connected (which it is in this case), see below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the peer is already connected - Connect (and with it IdentifyWait) have already been called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If peer is already connected and is listed among connected peers, this does not mean Connect() and IdentifyWait() have necessarily been finished. So the race is still possible.

}
cfg := fh.cfg
bootnodes, err := parseIntoAddr(fh.cfg.Bootnodes)
if err != nil {
Expand Down Expand Up @@ -504,3 +521,19 @@ func (fh *Host) trackNetEvents() error {
func (fh *Host) PeerInfo() peerinfo.PeerInfo {
return fh.peerInfo
}

// Identify ensures that the given peer is identified via libp2p identify protocol.
// Identification is initiated after connecting to the peer, and the set of protocols for
// the peer in the ProtoBook is not guaranteed to be correct until identification
// finishes.
// Note that the set of the protocols in the ProtoBook for a particular peer may also
// change via a push identity notification when the peer adds a new handler via
// SetStreamHandler (e.g. sets up a new Server).
func (fh *Host) Identify(p peer.ID) {
for _, c := range fh.Network().ConnsToPeer(p) {
// IDService.IdentifyConn is a no-op if the connection is already
// identified, but otherwise we need to wait for identification to finish
// to have proper set of protocols.
fh.identifyConn(c)
}
}
15 changes: 5 additions & 10 deletions sync2/dbset/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func runSync(
cfg rangesync.RangeSetReconcilerConfig,
) {
log := zaptest.NewLogger(t)
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
proto := "itest"
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
Expand Down Expand Up @@ -210,15 +211,9 @@ func runSync(
return srv.Run(ctx)
})

// Wait for the server to activate
require.Eventually(t, func() bool {
for _, h := range mesh.Hosts() {
if len(h.Mux().Protocols()) == 0 {
return false
}
}
return true
}, time.Second, 10*time.Millisecond)
// Connect the P2P mesh only after the servers are configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

startTimer(t)
pssB := rangesync.NewPairwiseSetSyncerInternal(
Expand Down
7 changes: 6 additions & 1 deletion sync2/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func TestP2P(t *testing.T) {
maxDepth = 24
)
logger := zaptest.NewLogger(t)
mesh, err := mocknet.FullMeshConnected(numNodes)
// Do not connect immediately.
// See Test_GetAtxsLimiting in fetch package for details.
mesh, err := mocknet.FullMeshLinked(numNodes)
require.NoError(t, err)
hs := make([]*sync2.P2PHashSync, numNodes)
initialSet := make([]rangesync.KeyBytes, numHashes)
Expand Down Expand Up @@ -120,6 +122,9 @@ func TestP2P(t *testing.T) {
require.NoError(t, err)
require.NoError(t, hs[n].Load())
require.False(t, hs[n].Synced())
}
require.NoError(t, mesh.ConnectAllButSelf())
for n := range hs {
hs[n].Start()
}

Expand Down
6 changes: 5 additions & 1 deletion sync2/rangesync/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func makeFakeDispHandler(n int) rangesync.Handler {
}

func TestDispatcher(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

d := rangesync.NewDispatcher(zaptest.NewLogger(t))
Expand All @@ -48,6 +49,9 @@ func TestDispatcher(t *testing.T) {
srvPeerID := mesh.Hosts()[0].ID()

c := server.New(mesh.Hosts()[1], proto, d.Dispatch, opts...)
// Connect the P2P mesh only after the server is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())
for _, tt := range []struct {
name string
want int
Expand Down
14 changes: 5 additions & 9 deletions sync2/rangesync/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func fakeRequesterGetter(t *testing.T) getRequesterFunc {
}

func p2pRequesterGetter(tb testing.TB) getRequesterFunc {
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(tb, err)
proto := "itest"
opts := []server.Opt{
Expand All @@ -85,14 +86,9 @@ func p2pRequesterGetter(tb testing.TB) getRequesterFunc {
return server.New(mesh.Hosts()[0], proto, handler, opts...), mesh.Hosts()[0].ID()
}
s := server.New(mesh.Hosts()[1], proto, handler, opts...)
require.Eventually(tb, func() bool {
for _, h := range mesh.Hosts()[0:] {
if len(h.Mux().Protocols()) == 0 {
return false
}
}
return true
}, time.Second, 10*time.Millisecond)
// Connect the P2P mesh only after the servers is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(tb, mesh.ConnectAllButSelf())
return s, mesh.Hosts()[1].ID()
}
}
Expand Down
Loading
Loading