diff --git a/CHANGELOG.md b/CHANGELOG.md index ff6f31fa47..5494d24a4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,67 +2,14 @@ See [RELEASE](./RELEASE.md) for workflow instructions. -## v1.1.2 +## v1.1.3 ### Upgrade information -Legacy discovery protocol was removed in [#4836](https://github.com/spacemeshos/go-spacemesh/pull/4836). -Config option and flag `p2p-disable-legacy-discovery` is noop, and will be completely removed in future versions. - ### Highlights -With [#4893](https://github.com/spacemeshos/go-spacemesh/pull/4893) Nodes are given more time to publish an ATX -Nodes still need to publish an ATX before the new PoET round starts (within 12h on mainnet) to make it into the -next PoET round, but if they miss that deadline they will now continue to publish an ATX to receive rewards for -the upcoming epoch and skip one after that. - ### Features -* [#4845](https://github.com/spacemeshos/go-spacemesh/pull/4845) API to fetch opened connections. - -> grpcurl -plaintext 127.0.0.1:9093 spacemesh.v1.AdminService.PeerInfoStream - -```json -{ - "id": "12D3KooWEcgADBR4zHirw7YAt6nUtCNhbPWkB2fnHAemnG5cGf2n", - "connections": [ - { - "address": "/ip4/46.4.81.145/tcp/5001", - "uptime": "1359.186975782s", - "outbound": true - } - ] -} -{ - "id": "12D3KooWHK5m83sNj2eNMJMGAngcS9gBja27ho83t79Q2CD4iRjQ", - "connections": [ - { - "address": "/ip4/34.86.244.124/tcp/5000", - "uptime": "1108.414456262s", - "outbound": true - } - ], - "tags": [ - "bootnode" - ] -} -``` - -* [4795](https://github.com/spacemeshos/go-spacemesh/pull/4795) p2p: add ip4/ip6 blocklists - -Doesn't affect direct peers. In order to disable: - -```json -{ - "p2p": { - "ip4-blocklist": [], - "ip6-blocklist": [] - } -} -``` - ### Improvements -* [#4882](https://github.com/spacemeshos/go-spacemesh/pull/4882) Increase cache size and parametrize datastore. -* [#4887](https://github.com/spacemeshos/go-spacemesh/pull/4887) Fixed crashes on API call. -* [#4871](https://github.com/spacemeshos/go-spacemesh/pull/4871) Add jitter to spread out requests to get poet proof and submit challenge +* [#4939](https://github.com/spacemeshos/go-spacemesh/pull/4939) Make sure to fetch data from peers that are already connected. diff --git a/fetch/cache.go b/fetch/cache.go index 5685b3ce18..619749f71d 100644 --- a/fetch/cache.go +++ b/fetch/cache.go @@ -5,6 +5,7 @@ import ( "sync" lru "github.com/hashicorp/golang-lru/v2" + "golang.org/x/exp/maps" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/datastore" @@ -71,24 +72,15 @@ func (hpc *HashPeersCache) Add(hash types.Hash32, peer p2p.Peer) { hpc.add(hash, peer) } -// GetRandom returns a random peer for a given hash. -func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) (p2p.Peer, bool) { +// GetRandom returns a randomized list of peers for a given hash. +func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) []p2p.Peer { hpc.mu.Lock() defer hpc.mu.Unlock() - hashPeersMap, exists := hpc.getWithStats(hash, hint) - if !exists { - return p2p.NoPeer, false - } - n := rng.Intn(len(hashPeersMap)) + 1 - i := 0 - for peer := range hashPeersMap { - i++ - if i == n { - return peer, true - } - } - return p2p.NoPeer, false + pm, _ := hpc.getWithStats(hash, hint) + peers := maps.Keys(pm) + rng.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) + return peers } // RegisterPeerHashes registers provided peer for a list of hashes. diff --git a/fetch/cache_test.go b/fetch/cache_test.go index 32269691f9..7c116a2ae9 100644 --- a/fetch/cache_test.go +++ b/fetch/cache_test.go @@ -72,6 +72,13 @@ func TestAdd(t *testing.T) { func TestGetRandom(t *testing.T) { t.Parallel() + t.Run("no hash peers", func(t *testing.T) { + cache := NewHashPeersCache(10) + hash := types.RandomHash() + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + peers := cache.GetRandom(hash, datastore.TXDB, rng) + require.Empty(t, peers) + }) t.Run("1Hash3Peers", func(t *testing.T) { cache := NewHashPeersCache(10) hash := types.RandomHash() @@ -94,9 +101,8 @@ func TestGetRandom(t *testing.T) { }() wg.Wait() rng := rand.New(rand.NewSource(time.Now().UnixNano())) - peer, exists := cache.GetRandom(hash, datastore.TXDB, rng) - require.Equal(t, true, exists) - require.Contains(t, []p2p.Peer{peer1, peer2, peer3}, peer) + peers := cache.GetRandom(hash, datastore.TXDB, rng) + require.ElementsMatch(t, []p2p.Peer{peer1, peer2, peer3}, peers) }) t.Run("2Hashes1Peer", func(t *testing.T) { cache := NewHashPeersCache(10) @@ -115,12 +121,10 @@ func TestGetRandom(t *testing.T) { }() wg.Wait() rng := rand.New(rand.NewSource(time.Now().UnixNano())) - randomPeer, exists := cache.GetRandom(hash1, datastore.TXDB, rng) - require.Equal(t, true, exists) - require.Equal(t, peer, randomPeer) - randomPeer, exists = cache.GetRandom(hash2, datastore.TXDB, rng) - require.Equal(t, true, exists) - require.Equal(t, peer, randomPeer) + randomPeers := cache.GetRandom(hash1, datastore.TXDB, rng) + require.Equal(t, []p2p.Peer{peer}, randomPeers) + randomPeers = cache.GetRandom(hash2, datastore.TXDB, rng) + require.Equal(t, []p2p.Peer{peer}, randomPeers) }) } diff --git a/fetch/fetch.go b/fetch/fetch.go index 23a01c09af..441f44080c 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -482,18 +482,23 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req } return nil } - for _, req := range requests { - p, exists := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng) - if !exists { - p = randomPeer(peers) + target := p2p.NoPeer + hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng) + for _, p := range hashPeers { + if f.host.Connected(p) { + target = p + break + } } - - _, ok := peer2requests[p] + if target == p2p.NoPeer { + target = randomPeer(peers) + } + _, ok := peer2requests[target] if !ok { - peer2requests[p] = []RequestMessage{req} + peer2requests[target] = []RequestMessage{req} } else { - peer2requests[p] = append(peer2requests[p], req) + peer2requests[target] = append(peer2requests[target], req) } } diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index c103615343..31c03643be 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -136,6 +136,43 @@ func TestFetch_GetHash(t *testing.T) { require.NotEqual(t, p1.completed, p2.completed) } +func TestFetch_GetHashPeerNotConnected(t *testing.T) { + f := createFetch(t) + f.cfg.MaxRetriesForRequest = 0 + f.cfg.MaxRetriesForPeer = 0 + peer := p2p.Peer("buddy") + awol := p2p.Peer("notConnected") + f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer}) + f.mh.EXPECT().ID().Return(p2p.Peer("self")) + f.mh.EXPECT().Connected(awol).Return(false) + hsh := types.RandomHash() + f.RegisterPeerHashes(awol, []types.Hash32{hsh}) + + res := ResponseMessage{ + Hash: hsh, + Data: []byte("a"), + } + f.mHashS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error { + var rb RequestBatch + err := codec.Decode(req, &rb) + require.NoError(t, err) + resBatch := ResponseBatch{ + ID: rb.ID, + Responses: []ResponseMessage{res}, + } + bts, err := codec.Encode(&resBatch) + require.NoError(t, err) + okFunc(bts) + return nil + }) + + p, err := f.getHash(context.TODO(), hsh, datastore.BlockDB, goodReceiver) + require.NoError(t, err) + f.requestHashBatchFromPeers() + <-p.completed +} + func TestFetch_RequestHashBatchFromPeers(t *testing.T) { tt := []struct { name string @@ -164,6 +201,7 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) { f.cfg.MaxRetriesForPeer = 0 peer := p2p.Peer("buddy") f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer}) + f.mh.EXPECT().Connected(peer).Return(true).AnyTimes() hsh0 := types.RandomHash() res0 := ResponseMessage{ diff --git a/fetch/interface.go b/fetch/interface.go index 048b487328..40bc59488c 100644 --- a/fetch/interface.go +++ b/fetch/interface.go @@ -43,6 +43,7 @@ type meshProvider interface { type host interface { ID() p2p.Peer GetPeers() []p2p.Peer + Connected(p2p.Peer) bool PeerProtocols(p2p.Peer) ([]protocol.ID, error) Close() error } diff --git a/fetch/mesh_data_test.go b/fetch/mesh_data_test.go index ba330b7f9c..c523f5caea 100644 --- a/fetch/mesh_data_test.go +++ b/fetch/mesh_data_test.go @@ -146,6 +146,7 @@ func TestFetch_getHashes(t *testing.T) { f.cfg.BatchSize = 2 f.cfg.MaxRetriesForRequest = 0 f.cfg.MaxRetriesForPeer = 0 + f.mh.EXPECT().Connected(gomock.Any()).Return(true).AnyTimes() peers := []p2p.Peer{p2p.Peer("buddy 0"), p2p.Peer("buddy 1")} f.mh.EXPECT().GetPeers().Return(peers) f.mh.EXPECT().ID().Return(p2p.Peer("self")).AnyTimes() diff --git a/fetch/mocks/mocks.go b/fetch/mocks/mocks.go index fadcb11e27..5bb2401b23 100644 --- a/fetch/mocks/mocks.go +++ b/fetch/mocks/mocks.go @@ -199,6 +199,20 @@ func (mr *MockhostMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*Mockhost)(nil).Close)) } +// Connected mocks base method. +func (m *Mockhost) Connected(arg0 p2p.Peer) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Connected", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// Connected indicates an expected call of Connected. +func (mr *MockhostMockRecorder) Connected(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connected", reflect.TypeOf((*Mockhost)(nil).Connected), arg0) +} + // GetPeers mocks base method. func (m *Mockhost) GetPeers() []p2p.Peer { m.ctrl.T.Helper() diff --git a/p2p/upgrade.go b/p2p/upgrade.go index 47cb25943f..6b00e03c1b 100644 --- a/p2p/upgrade.go +++ b/p2p/upgrade.go @@ -169,6 +169,10 @@ func (fh *Host) GetPeers() []Peer { return fh.Host.Network().Peers() } +func (fh *Host) Connected(p Peer) bool { + return fh.Host.Network().Connectedness(p) == network.Connected +} + // ConnectedPeerInfo retrieves a peer info object for the given peer.ID, if the // given peer is not connected then nil is returned. func (fh *Host) ConnectedPeerInfo(id peer.ID) *PeerInfo {