From c72d92ccaad557272e4de63d16f53718b9543bd9 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 26 Dec 2024 22:44:43 +0400 Subject: [PATCH] Fix libp2p identify race When a P2P test is set up using `mocknet.FullMeshConnected(...)` and then calls `p2p/server.New(...)`, there's a possible race due to how `libp2p` `identify` service works. Namely, when a new peer connects, an active `identify` request is initiated towards it asking in particular what protocols does the peer support, to which the peer must reply with an identify response message. Also, when `SetStreamHandler` is called, an identify response message is pushed towards the currently connected peers. In some cases, the following race is possible: 1. Peer `A` connects to peer `B`. 2. Peer `B` sends identify request to peer `A`. 3. Peer `A` sends response to the identify request from peer `A`. This response contains the list of protocols, but that list misses the protocol which is used for `Server` in p.4, b/c `Server` is not set up yet. 4. Peer `A` sets up a `Server` which uses `SetStreamHandler`, and at this point peer `A` sends pushes an identify response message to peer `B`, _without_ corresponding identify request. 5. Peer `B` receives pushed identify response from `A` which is sent in p.4, despite it being sent after the response in p.3. This may happen due to how `libp2p` handles incoming requests. Peer `B` sets the supported protocols in its `ProtoBook` for peer `A`, the list of protocols now contains the protocol specfied for the `Server` in p.4. 6. Peer `B` receives identify response from `A` which was sent in p.3, despite it being sent before p.4, due to possible reordering. This response also has a list of protocols, but it misses the protocol specified for the `Server` in p.4. Peer `B` again sets the supported protocols in its `ProtoBook` for peer `A`, but now that list misses the necessary protocol. 7. Peer `B` tries to find peers which support the protocol used for the `Server` in p.4, or connect to peer `B` using that protocol. This fails b/c `ProtoBook` entry for peer `A` contains wrong protocol list. In addition to this, there's an issue with protocol support checks which `Fetcher` does to check which peers it can retrieve data from. When a peer is freshly connected, the active identify request towards it may not be finished yet when the fetcher tries to check that peer. Although unlikely, in some cases this may cause valid peers to get ignored. This change removes the instances of use of `mocknet.FullMeshConnected(...)` where it may cause identify race, replacing it with `mocknet.FullMeshLinked(...)` followed by `mesh.ConnectAllButSelf()` after the `Server`s are set up. It also fixes fetcher peer selection mechanism so it waits for any pending identification request to finish, similar how to `Host.NewStream` does that. --- fetch/fetch.go | 6 +++++ fetch/mesh_data_test.go | 10 ++++++--- p2p/host.go | 12 ++++++++++ p2p/persist_test.go | 4 ++++ p2p/server/server_test.go | 36 +++++++++++++++++++++++++++--- p2p/upgrade.go | 34 +++++++++++++++++++++++++++- sync2/dbset/p2p_test.go | 15 +++++-------- sync2/p2p_test.go | 7 +++++- sync2/rangesync/dispatcher_test.go | 6 ++++- sync2/rangesync/p2p_test.go | 14 +++++------- timesync/peersync/sync_test.go | 14 ++++++++---- 11 files changed, 126 insertions(+), 32 deletions(-) diff --git a/fetch/fetch.go b/fetch/fetch.go index 453ce11a7c..716a6f30e2 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -300,6 +300,12 @@ func NewFetch( if host != nil { connectedf := func(peer p2p.Peer) { protocols := func() []protocol.ID { + // Make sure that the protocol list for the peer is correct. + // This is similar to what Host.NewStream does to make + // sure it is possible to use one of the specified + // protocols. If we don't do this, there may be a race causing + // some peers to be unnecessarily ignored. + host.Identify(peer) ps, err := host.Peerstore().GetProtocols(peer) if err != nil { f.logger.Debug("failed to get protocols for peer", diff --git a/fetch/mesh_data_test.go b/fetch/mesh_data_test.go index f7c414c8df..a3fa5b4165 100644 --- a/fetch/mesh_data_test.go +++ b/fetch/mesh_data_test.go @@ -966,9 +966,6 @@ func TestFetch_GetCert(t *testing.T) { // Test if GetAtxs() limits the number of concurrent requests to `cfg.GetAtxsConcurrency`. func Test_GetAtxsLimiting(t *testing.T) { - mesh, err := mocknet.FullMeshConnected(2) - require.NoError(t, err) - const ( totalRequests = 100 getAtxConcurrency = 10 @@ -976,6 +973,9 @@ func Test_GetAtxsLimiting(t *testing.T) { for _, withLimiting := range []bool{false, true} { t.Run(fmt.Sprintf("with limiting: %v", withLimiting), func(t *testing.T) { + // Do not connect immediately in order to avoid identify race. + mesh, err := mocknet.FullMeshLinked(2) + require.NoError(t, err) srv := server.New( wrapHost(mesh.Hosts()[1]), hashProtocol, @@ -1038,6 +1038,10 @@ func Test_GetAtxsLimiting(t *testing.T) { require.NoError(t, f.Start()) t.Cleanup(f.Stop) + // Connect the P2P mesh only after the server is configured. + // This way, we avoid the race causing bad protocol identification. + require.NoError(t, mesh.ConnectAllButSelf()) + var atxIds []types.ATXID for i := 0; i < totalRequests; i++ { id := types.RandomATXID() diff --git a/p2p/host.go b/p2p/host.go index 3fb9e94382..a52cddfece 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -25,12 +25,14 @@ import ( tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-libp2p/p2p/security/noise" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" "github.com/libp2p/go-libp2p/p2p/transport/tcp" ma "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/fx" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -282,6 +284,8 @@ func New( return nil, fmt.Errorf("can't set up connection gater: %w", err) } + var identifyConn func(network.Conn) + pt := peerinfo.NewPeerInfoTracker() lopts := []libp2p.Option{ libp2p.Identity(key), @@ -295,6 +299,11 @@ func New( cfg.AutoNATServer.PeerMax, cfg.AutoNATServer.ResetPeriod), libp2p.ConnectionGater(g), + libp2p.WithFxOption(fx.Invoke(func(ids identify.IDService) { + identifyConn = func(c network.Conn) { + ids.IdentifyConn(c) + } + })), } if cfg.EnableTCPTransport { lopts = append(lopts, @@ -413,6 +422,9 @@ func New( if err != nil { return nil, fmt.Errorf("failed to initialize libp2p host: %w", err) } + if identifyConn == nil { + panic("BUG: identify service not set") + } g.updateHost(h) h.Network().Notify(p2pmetrics.NewConnectionsMeeter()) pt.Start(h.Network()) diff --git a/p2p/persist_test.go b/p2p/persist_test.go index 5442ec0bf9..a5bbddcf3a 100644 --- a/p2p/persist_test.go +++ b/p2p/persist_test.go @@ -17,6 +17,8 @@ func TestConnectedPersist(t *testing.T) { dir := t.TempDir() ctx, cancel := context.WithCancel(context.Background()) const n = 3 + // We can use FullMeshConnected here b/c we don't need to query peers' protocols, + // and thus there are no issues with identify service race. mock, err := mocknet.FullMeshConnected(n) require.NoError(t, err) var eg errgroup.Group @@ -46,6 +48,8 @@ func TestConnectedBrokenCRC(t *testing.T) { dir := t.TempDir() ctx, cancel := context.WithCancel(context.Background()) const n = 3 + // We can use FullMeshConnected here b/c we don't need to query peers' protocols, + // and thus there are no issues with identify service race. mock, err := mocknet.FullMeshConnected(n) require.NoError(t, err) var eg errgroup.Group diff --git a/p2p/server/server_test.go b/p2p/server/server_test.go index a2bb905abf..708580c749 100644 --- a/p2p/server/server_test.go +++ b/p2p/server/server_test.go @@ -44,7 +44,30 @@ func wrapHost(tb testing.TB, h host.Host) Host { func TestServer(t *testing.T) { const limit = 1024 - mesh, err := mocknet.FullMeshConnected(5) + // Don't establish peer connections immediately. + // If we make the nodes connect to each other right away, there + // may be a problem with libp2p identify service. Namely, when a + // connection to a peer is established, identify request is sent + // to that peer, to which the peer sends identify response. Also, + // when a new handler is added to the host via SetStreamHandler, + // as it's done in p2p/server.Server, the identify response + // message is pushed to the peers currently connected to this + // node. + // When a server is created immediately following a connection + // from a peer, that peer may receive 2 identify response messages + // in quick succession: one which is indeed the response to + // initial identify request, and one which is push message + // generated by creation of the server. In some cases, the initial + // response may not contain the protocol of the freshly added + // server (hashProtocol="hs/1" in this case). + // The identify response message sent when SetStreamHandler is + // called will always contain the protocol being added, but due to + // how libp2p works, the responses may be processed in reverse + // order, and the set of protocols from the initial message will + // override the set of protocols in the second mesage (from + // SetStreamHandler). In this case, the peer will have a wrong + // idea about this node's protocols, and the test may fail. + mesh, err := mocknet.FullMeshLinked(5) require.NoError(t, err) proto := "test" request := []byte("test request") @@ -121,6 +144,10 @@ func TestServer(t *testing.T) { eg.Wait() }) + // Connect the P2P mesh only after the servers are configured. + // This way, we avoid the race causing bad protocol identification. + require.NoError(t, mesh.ConnectAllButSelf()) + t.Run("ReceiveMessage", func(t *testing.T) { n := srvs[0].NumAcceptedRequests() srvID := mesh.Hosts()[1].ID() @@ -212,7 +239,7 @@ func TestServer(t *testing.T) { } func Test_Queued(t *testing.T) { - mesh, err := mocknet.FullMeshConnected(2) + mesh, err := mocknet.FullMeshLinked(2) require.NoError(t, err) var ( @@ -245,6 +272,8 @@ func Test_Queued(t *testing.T) { t.Cleanup(func() { assert.NoError(t, eg.Wait()) }) + require.NoError(t, mesh.ConnectAllButSelf()) + var reqEq errgroup.Group for i := 0; i < queueSize; i++ { // fill the queue with requests reqEq.Go(func() error { @@ -266,7 +295,7 @@ func Test_Queued(t *testing.T) { } func Test_RequestInterval(t *testing.T) { - mesh, err := mocknet.FullMeshConnected(2) + mesh, err := mocknet.FullMeshLinked(2) require.NoError(t, err) var ( @@ -295,6 +324,7 @@ func Test_RequestInterval(t *testing.T) { t.Cleanup(func() { assert.NoError(t, eg.Wait()) }) + require.NoError(t, mesh.ConnectAllButSelf()) start := time.Now() for i := 0; i < maxReq; i++ { // fill the interval with requests (bursts up to maxReq are allowed) diff --git a/p2p/upgrade.go b/p2p/upgrade.go index 604ed1bf3d..ac018bf4a4 100644 --- a/p2p/upgrade.go +++ b/p2p/upgrade.go @@ -15,7 +15,9 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + basichost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/host/eventbus" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" ma "github.com/multiformats/go-multiaddr" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -75,6 +77,12 @@ func WithPeerInfo(pi peerinfo.PeerInfo) Opt { } } +func WithIDService(ids identify.IDService) Opt { + return func(fh *Host) { + fh.idService = ids + } +} + // Host is a convenience wrapper for all p2p related functionality required to run // a full spacemesh node. type Host struct { @@ -112,7 +120,8 @@ type Host struct { value network.Reachability } - ping *Ping + ping *Ping + idService identify.IDService } // Upgrade creates Host instance from host.Host. @@ -128,6 +137,13 @@ func Upgrade(h host.Host, opts ...Opt) (*Host, error) { for _, opt := range opts { opt(fh) } + if fh.idService == nil { + // If no IDService is provided, which may be the case in the tests, + // we can try to get it from the host, assuming it's a *basichost.BasicHost. + if bh, ok := h.(*basichost.BasicHost); ok { + fh.idService = bh.IDService() + } + } cfg := fh.cfg bootnodes, err := parseIntoAddr(fh.cfg.Bootnodes) if err != nil { @@ -504,3 +520,19 @@ func (fh *Host) trackNetEvents() error { func (fh *Host) PeerInfo() peerinfo.PeerInfo { return fh.peerInfo } + +// Identify ensures that the given peer is identified via libp2p identify protocol. +// Identification is initiated after connecting to the peer, and the set of protocols for +// the peer in the ProtoBook is not guaranteed to be correct until identification +// finishes. +// Note that the set of the protocols in the ProtoBook for a particular peer may also +// change via a push identity notification when the peer adds a new handler via +// SetStreamHandler (e.g. sets up a new Server). +func (fh *Host) Identify(p peer.ID) { + for _, c := range fh.Network().ConnsToPeer(p) { + // IdentifyConn is a no-op if the connection is already identified, + // but otherwise we need to wait for identification to finish to + // have proper set of protocols. + fh.idService.IdentifyConn(c) + } +} diff --git a/sync2/dbset/p2p_test.go b/sync2/dbset/p2p_test.go index c636c461ba..855690d541 100644 --- a/sync2/dbset/p2p_test.go +++ b/sync2/dbset/p2p_test.go @@ -143,7 +143,8 @@ func runSync( cfg rangesync.RangeSetReconcilerConfig, ) { log := zaptest.NewLogger(t) - mesh, err := mocknet.FullMeshConnected(2) + // Don't connect immediately to avoid identify race. + mesh, err := mocknet.FullMeshLinked(2) require.NoError(t, err) proto := "itest" ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) @@ -210,15 +211,9 @@ func runSync( return srv.Run(ctx) }) - // Wait for the server to activate - require.Eventually(t, func() bool { - for _, h := range mesh.Hosts() { - if len(h.Mux().Protocols()) == 0 { - return false - } - } - return true - }, time.Second, 10*time.Millisecond) + // Connect the P2P mesh only after the servers are configured. + // This way, we avoid the race causing bad protocol identification. + require.NoError(t, mesh.ConnectAllButSelf()) startTimer(t) pssB := rangesync.NewPairwiseSetSyncerInternal( diff --git a/sync2/p2p_test.go b/sync2/p2p_test.go index 3f54c5064e..2e6313b3f3 100644 --- a/sync2/p2p_test.go +++ b/sync2/p2p_test.go @@ -79,7 +79,9 @@ func TestP2P(t *testing.T) { maxDepth = 24 ) logger := zaptest.NewLogger(t) - mesh, err := mocknet.FullMeshConnected(numNodes) + // Do not connect immediately. + // See Test_GetAtxsLimiting in fetch package for details. + mesh, err := mocknet.FullMeshLinked(numNodes) require.NoError(t, err) hs := make([]*sync2.P2PHashSync, numNodes) initialSet := make([]rangesync.KeyBytes, numHashes) @@ -120,6 +122,9 @@ func TestP2P(t *testing.T) { require.NoError(t, err) require.NoError(t, hs[n].Load()) require.False(t, hs[n].Synced()) + } + require.NoError(t, mesh.ConnectAllButSelf()) + for n := range hs { hs[n].Start() } diff --git a/sync2/rangesync/dispatcher_test.go b/sync2/rangesync/dispatcher_test.go index 4df3e1ff63..c1e826b425 100644 --- a/sync2/rangesync/dispatcher_test.go +++ b/sync2/rangesync/dispatcher_test.go @@ -29,7 +29,8 @@ func makeFakeDispHandler(n int) rangesync.Handler { } func TestDispatcher(t *testing.T) { - mesh, err := mocknet.FullMeshConnected(2) + // Don't connect immediately to avoid identify race. + mesh, err := mocknet.FullMeshLinked(2) require.NoError(t, err) d := rangesync.NewDispatcher(zaptest.NewLogger(t)) @@ -48,6 +49,9 @@ func TestDispatcher(t *testing.T) { srvPeerID := mesh.Hosts()[0].ID() c := server.New(mesh.Hosts()[1], proto, d.Dispatch, opts...) + // Connect the P2P mesh only after the server is configured. + // This way, we avoid the race causing bad protocol identification. + require.NoError(t, mesh.ConnectAllButSelf()) for _, tt := range []struct { name string want int diff --git a/sync2/rangesync/p2p_test.go b/sync2/rangesync/p2p_test.go index 5b47c06929..2d672e83b3 100644 --- a/sync2/rangesync/p2p_test.go +++ b/sync2/rangesync/p2p_test.go @@ -68,7 +68,8 @@ func fakeRequesterGetter(t *testing.T) getRequesterFunc { } func p2pRequesterGetter(tb testing.TB) getRequesterFunc { - mesh, err := mocknet.FullMeshConnected(2) + // Don't connect immediately to avoid identify race. + mesh, err := mocknet.FullMeshLinked(2) require.NoError(tb, err) proto := "itest" opts := []server.Opt{ @@ -85,14 +86,9 @@ func p2pRequesterGetter(tb testing.TB) getRequesterFunc { return server.New(mesh.Hosts()[0], proto, handler, opts...), mesh.Hosts()[0].ID() } s := server.New(mesh.Hosts()[1], proto, handler, opts...) - require.Eventually(tb, func() bool { - for _, h := range mesh.Hosts()[0:] { - if len(h.Mux().Protocols()) == 0 { - return false - } - } - return true - }, time.Second, 10*time.Millisecond) + // Connect the P2P mesh only after the servers is configured. + // This way, we avoid the race causing bad protocol identification. + require.NoError(tb, mesh.ConnectAllButSelf()) return s, mesh.Hosts()[1].ID() } } diff --git a/timesync/peersync/sync_test.go b/timesync/peersync/sync_test.go index 11ad013e27..e4f1c7c65e 100644 --- a/timesync/peersync/sync_test.go +++ b/timesync/peersync/sync_test.go @@ -38,7 +38,8 @@ func TestSyncGetOffset(t *testing.T) { ) t.Run("Success", func(t *testing.T) { - mesh, err := mocknet.FullMeshConnected(4) + // Don't connect immediately to avoid identify race. + mesh, err := mocknet.FullMeshLinked(4) require.NoError(t, err) ctrl := gomock.NewController(t) @@ -51,13 +52,16 @@ func TestSyncGetOffset(t *testing.T) { require.NotNil(t, New(h, nil, WithTime(adjustedTime(peerResponse)))) } sync := New(mesh.Hosts()[0], nil, WithTime(tm)) + // Connect the P2P mesh only after the servers are configured. + // This way, we avoid the race causing bad protocol identification. + require.NoError(t, mesh.ConnectAllButSelf()) offset, err := sync.GetOffset(context.TODO(), 0, peers) require.NoError(t, err) require.Equal(t, 5*time.Second, offset) }) t.Run("Failure", func(t *testing.T) { - mesh, err := mocknet.FullMeshConnected(4) + mesh, err := mocknet.FullMeshLinked(4) require.NoError(t, err) ctrl := gomock.NewController(t) @@ -70,6 +74,7 @@ func TestSyncGetOffset(t *testing.T) { } sync := New(mesh.Hosts()[0], nil, WithTime(tm)) + require.NoError(t, mesh.ConnectAllButSelf()) offset, err := sync.GetOffset(context.TODO(), 0, peers) require.ErrorIs(t, err, errTimesyncFailed) require.Empty(t, offset) @@ -91,7 +96,7 @@ func TestSyncTerminateOnError(t *testing.T) { responseReceive = start.Add(30 * time.Second) ) - mesh, err := mocknet.FullMeshConnected(4) + mesh, err := mocknet.FullMeshLinked(4) require.NoError(t, err) ctrl := gomock.NewController(t) getter := mocks.NewMockgetPeers(ctrl) @@ -113,6 +118,7 @@ func TestSyncTerminateOnError(t *testing.T) { sync.Start() t.Cleanup(sync.Stop) + require.NoError(t, mesh.ConnectAllButSelf()) errors := make(chan error, 1) go func() { errors <- sync.Wait() @@ -143,7 +149,6 @@ func TestSyncSimulateMultiple(t *testing.T) { t.Cleanup(func() { assert.NoError(t, fh.Stop()) }) hosts = append(hosts, fh) } - require.NoError(t, mesh.ConnectAllButSelf()) // First create all instances so they register in the protocol // and then start them. @@ -159,6 +164,7 @@ func TestSyncSimulateMultiple(t *testing.T) { sync.Start() t.Cleanup(sync.Stop) } + require.NoError(t, mesh.ConnectAllButSelf()) for i, inst := range instances { if errors[i] == nil { continue