Skip to content

Commit

Permalink
Fix libp2p identify race
Browse files Browse the repository at this point in the history
When a P2P test is set up using `mocknet.FullMeshConnected(...)` and
then calls `p2p/server.New(...)`, there's a possible race due to how
`libp2p` `identify` service works. Namely, when a new peer connects,
an active `identify` request is initiated towards it asking in
particular what protocols does the peer support, to which the peer
must reply with an identify response message. Also, when
`SetStreamHandler` is called, an identify response message is pushed
towards the currently connected peers. In some cases, the following
race is possible:

1. Peer `A` connects to peer `B`.
2. Peer `B` sends identify request to peer `A`.
3. Peer `A` sends response to the identify request from peer
`A`. This response contains the list of protocols, but that list
misses the protocol which is used for `Server` in p.4, b/c `Server` is
not set up yet.
4. Peer `A` sets up a `Server` which uses `SetStreamHandler`, and at
this point peer `A` sends pushes an identify response message to peer
`B`, _without_ corresponding identify request.
5. Peer `B` receives pushed identify response from `A` which is sent
in p.4, despite it being sent after the response in p.3. This may
happen due to how `libp2p` handles incoming requests. Peer `B` sets
the supported protocols in its `ProtoBook` for peer `A`, the list of
protocols now contains the protocol specfied for the `Server` in p.4.
6. Peer `B` receives identify response from `A` which was sent in p.3,
despite it being sent before p.4, due to possible reordering. This
response also has a list of protocols, but it misses the protocol
specified for the `Server` in p.4. Peer `B` again sets the supported
protocols in its `ProtoBook` for peer `A`, but now that list misses
the necessary protocol.
7. Peer `B` tries to find peers which support the protocol used for
the `Server` in p.4, or connect to peer `B` using that protocol. This
fails b/c `ProtoBook` entry for peer `A` contains wrong protocol list.

In addition to this, there's an issue with protocol support checks
which `Fetcher` does to check which peers it can retrieve data from.
When a peer is freshly connected, the active identify request towards
it may not be finished yet when the fetcher tries to check that peer.
Although unlikely, in some cases this may cause valid peers to get
ignored.

This change removes the instances of use of
`mocknet.FullMeshConnected(...)` where it may cause identify race,
replacing it with `mocknet.FullMeshLinked(...)` followed by
`mesh.ConnectAllButSelf()` after the `Server`s are set up.
It also fixes fetcher peer selection mechanism so it waits for any
pending identification request to finish, similar how to
`Host.NewStream` does that.
  • Loading branch information
ivan4th committed Dec 26, 2024
1 parent b905784 commit 119c374
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 33 deletions.
6 changes: 6 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func NewFetch(
if host != nil {
connectedf := func(peer p2p.Peer) {
protocols := func() []protocol.ID {
// Make sure that the protocol list for the peer is correct.
// This is similar to what Host.NewStream does to make
// sure it is possible to use one of the specified
// protocols. If we don't do this, there may be a race causing
// some peers to be unnecessarily ignored.
host.Identify(peer)
ps, err := host.Peerstore().GetProtocols(peer)
if err != nil {
f.logger.Debug("failed to get protocols for peer",
Expand Down
10 changes: 7 additions & 3 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,16 +966,16 @@ func TestFetch_GetCert(t *testing.T) {

// Test if GetAtxs() limits the number of concurrent requests to `cfg.GetAtxsConcurrency`.
func Test_GetAtxsLimiting(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)

const (
totalRequests = 100
getAtxConcurrency = 10
)

for _, withLimiting := range []bool{false, true} {
t.Run(fmt.Sprintf("with limiting: %v", withLimiting), func(t *testing.T) {
// Do not connect immediately in order to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
srv := server.New(
wrapHost(mesh.Hosts()[1]),
hashProtocol,
Expand Down Expand Up @@ -1038,6 +1038,10 @@ func Test_GetAtxsLimiting(t *testing.T) {
require.NoError(t, f.Start())
t.Cleanup(f.Stop)

// Connect the P2P mesh only after the server is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

var atxIds []types.ATXID
for i := 0; i < totalRequests; i++ {
id := types.RandomATXID()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/zeebo/blake3 v0.2.4
go.uber.org/fx v1.23.0
go.uber.org/mock v0.5.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
Expand Down Expand Up @@ -231,7 +232,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/fx v1.23.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/mod v0.22.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/security/noise"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -282,6 +284,8 @@ func New(
return nil, fmt.Errorf("can't set up connection gater: %w", err)
}

var identifyConn func(network.Conn)

pt := peerinfo.NewPeerInfoTracker()
lopts := []libp2p.Option{
libp2p.Identity(key),
Expand All @@ -295,6 +299,11 @@ func New(
cfg.AutoNATServer.PeerMax,
cfg.AutoNATServer.ResetPeriod),
libp2p.ConnectionGater(g),
libp2p.WithFxOption(fx.Invoke(func(ids identify.IDService) {
identifyConn = func(c network.Conn) {
ids.IdentifyConn(c)
}
})),
}
if cfg.EnableTCPTransport {
lopts = append(lopts,
Expand Down Expand Up @@ -413,6 +422,9 @@ func New(
if err != nil {
return nil, fmt.Errorf("failed to initialize libp2p host: %w", err)
}
if identifyConn == nil {
panic("BUG: identify service not set")
}
g.updateHost(h)
h.Network().Notify(p2pmetrics.NewConnectionsMeeter())
pt.Start(h.Network())
Expand Down
4 changes: 4 additions & 0 deletions p2p/persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestConnectedPersist(t *testing.T) {
dir := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
const n = 3
// We can use FullMeshConnected here b/c we don't need to query peers' protocols,
// and thus there are no issues with identify service race.
mock, err := mocknet.FullMeshConnected(n)
require.NoError(t, err)
var eg errgroup.Group
Expand Down Expand Up @@ -46,6 +48,8 @@ func TestConnectedBrokenCRC(t *testing.T) {
dir := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
const n = 3
// We can use FullMeshConnected here b/c we don't need to query peers' protocols,
// and thus there are no issues with identify service race.
mock, err := mocknet.FullMeshConnected(n)
require.NoError(t, err)
var eg errgroup.Group
Expand Down
36 changes: 33 additions & 3 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,30 @@ func wrapHost(tb testing.TB, h host.Host) Host {
func TestServer(t *testing.T) {
const limit = 1024

mesh, err := mocknet.FullMeshConnected(5)
// Don't establish peer connections immediately.
// If we make the nodes connect to each other right away, there
// may be a problem with libp2p identify service. Namely, when a
// connection to a peer is established, identify request is sent
// to that peer, to which the peer sends identify response. Also,
// when a new handler is added to the host via SetStreamHandler,
// as it's done in p2p/server.Server, the identify response
// message is pushed to the peers currently connected to this
// node.
// When a server is created immediately following a connection
// from a peer, that peer may receive 2 identify response messages
// in quick succession: one which is indeed the response to
// initial identify request, and one which is push message
// generated by creation of the server. In some cases, the initial
// response may not contain the protocol of the freshly added
// server (hashProtocol="hs/1" in this case).
// The identify response message sent when SetStreamHandler is
// called will always contain the protocol being added, but due to
// how libp2p works, the responses may be processed in reverse
// order, and the set of protocols from the initial message will
// override the set of protocols in the second mesage (from
// SetStreamHandler). In this case, the peer will have a wrong
// idea about this node's protocols, and the test may fail.
mesh, err := mocknet.FullMeshLinked(5)
require.NoError(t, err)
proto := "test"
request := []byte("test request")
Expand Down Expand Up @@ -121,6 +144,10 @@ func TestServer(t *testing.T) {
eg.Wait()
})

// Connect the P2P mesh only after the servers are configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

t.Run("ReceiveMessage", func(t *testing.T) {
n := srvs[0].NumAcceptedRequests()
srvID := mesh.Hosts()[1].ID()
Expand Down Expand Up @@ -212,7 +239,7 @@ func TestServer(t *testing.T) {
}

func Test_Queued(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

var (
Expand Down Expand Up @@ -245,6 +272,8 @@ func Test_Queued(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
require.NoError(t, mesh.ConnectAllButSelf())

var reqEq errgroup.Group
for i := 0; i < queueSize; i++ { // fill the queue with requests
reqEq.Go(func() error {
Expand All @@ -266,7 +295,7 @@ func Test_Queued(t *testing.T) {
}

func Test_RequestInterval(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

var (
Expand Down Expand Up @@ -295,6 +324,7 @@ func Test_RequestInterval(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
require.NoError(t, mesh.ConnectAllButSelf())

start := time.Now()
for i := 0; i < maxReq; i++ { // fill the interval with requests (bursts up to maxReq are allowed)
Expand Down
34 changes: 33 additions & 1 deletion p2p/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -75,6 +77,12 @@ func WithPeerInfo(pi peerinfo.PeerInfo) Opt {
}
}

func WithIDService(ids identify.IDService) Opt {
return func(fh *Host) {
fh.idService = ids
}
}

// Host is a convenience wrapper for all p2p related functionality required to run
// a full spacemesh node.
type Host struct {
Expand Down Expand Up @@ -112,7 +120,8 @@ type Host struct {
value network.Reachability
}

ping *Ping
ping *Ping
idService identify.IDService
}

// Upgrade creates Host instance from host.Host.
Expand All @@ -128,6 +137,13 @@ func Upgrade(h host.Host, opts ...Opt) (*Host, error) {
for _, opt := range opts {
opt(fh)
}
if fh.idService == nil {
// If no IDService is provided, which may be the case in the tests,
// we can try to get it from the host, assuming it's a *basichost.BasicHost.
if bh, ok := h.(*basichost.BasicHost); ok {
fh.idService = bh.IDService()
}
}
cfg := fh.cfg
bootnodes, err := parseIntoAddr(fh.cfg.Bootnodes)
if err != nil {
Expand Down Expand Up @@ -504,3 +520,19 @@ func (fh *Host) trackNetEvents() error {
func (fh *Host) PeerInfo() peerinfo.PeerInfo {
return fh.peerInfo
}

// Identify ensures that the given peer is identified via libp2p identify protocol.
// Identification is initiated after connecting to the peer, and the set of protocols for
// the peer in the ProtoBook is not guaranteed to be correct until identification
// finishes.
// Note that the set of the protocols in the ProtoBook for a particular peer may also
// change via a push identity notification when the peer adds a new handler via
// SetStreamHandler (e.g. sets up a new Server).
func (fh *Host) Identify(p peer.ID) {
for _, c := range fh.Network().ConnsToPeer(p) {
// IdentifyConn is a no-op if the connection is already identified,
// but otherwise we need to wait for identification to finish to
// have proper set of protocols.
fh.idService.IdentifyConn(c)
}
}
15 changes: 5 additions & 10 deletions sync2/dbset/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func runSync(
cfg rangesync.RangeSetReconcilerConfig,
) {
log := zaptest.NewLogger(t)
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
proto := "itest"
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
Expand Down Expand Up @@ -210,15 +211,9 @@ func runSync(
return srv.Run(ctx)
})

// Wait for the server to activate
require.Eventually(t, func() bool {
for _, h := range mesh.Hosts() {
if len(h.Mux().Protocols()) == 0 {
return false
}
}
return true
}, time.Second, 10*time.Millisecond)
// Connect the P2P mesh only after the servers are configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())

startTimer(t)
pssB := rangesync.NewPairwiseSetSyncerInternal(
Expand Down
7 changes: 6 additions & 1 deletion sync2/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func TestP2P(t *testing.T) {
maxDepth = 24
)
logger := zaptest.NewLogger(t)
mesh, err := mocknet.FullMeshConnected(numNodes)
// Do not connect immediately.
// See Test_GetAtxsLimiting in fetch package for details.
mesh, err := mocknet.FullMeshLinked(numNodes)
require.NoError(t, err)
hs := make([]*sync2.P2PHashSync, numNodes)
initialSet := make([]rangesync.KeyBytes, numHashes)
Expand Down Expand Up @@ -120,6 +122,9 @@ func TestP2P(t *testing.T) {
require.NoError(t, err)
require.NoError(t, hs[n].Load())
require.False(t, hs[n].Synced())
}
require.NoError(t, mesh.ConnectAllButSelf())
for n := range hs {
hs[n].Start()
}

Expand Down
6 changes: 5 additions & 1 deletion sync2/rangesync/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func makeFakeDispHandler(n int) rangesync.Handler {
}

func TestDispatcher(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

d := rangesync.NewDispatcher(zaptest.NewLogger(t))
Expand All @@ -48,6 +49,9 @@ func TestDispatcher(t *testing.T) {
srvPeerID := mesh.Hosts()[0].ID()

c := server.New(mesh.Hosts()[1], proto, d.Dispatch, opts...)
// Connect the P2P mesh only after the server is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(t, mesh.ConnectAllButSelf())
for _, tt := range []struct {
name string
want int
Expand Down
14 changes: 5 additions & 9 deletions sync2/rangesync/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func fakeRequesterGetter(t *testing.T) getRequesterFunc {
}

func p2pRequesterGetter(tb testing.TB) getRequesterFunc {
mesh, err := mocknet.FullMeshConnected(2)
// Don't connect immediately to avoid identify race.
mesh, err := mocknet.FullMeshLinked(2)
require.NoError(tb, err)
proto := "itest"
opts := []server.Opt{
Expand All @@ -85,14 +86,9 @@ func p2pRequesterGetter(tb testing.TB) getRequesterFunc {
return server.New(mesh.Hosts()[0], proto, handler, opts...), mesh.Hosts()[0].ID()
}
s := server.New(mesh.Hosts()[1], proto, handler, opts...)
require.Eventually(tb, func() bool {
for _, h := range mesh.Hosts()[0:] {
if len(h.Mux().Protocols()) == 0 {
return false
}
}
return true
}, time.Second, 10*time.Millisecond)
// Connect the P2P mesh only after the servers is configured.
// This way, we avoid the race causing bad protocol identification.
require.NoError(tb, mesh.ConnectAllButSelf())
return s, mesh.Hosts()[1].ID()
}
}
Expand Down
Loading

0 comments on commit 119c374

Please sign in to comment.