Skip to content

Commit

Permalink
cherry-pick fix for fetching block
Browse files Browse the repository at this point in the history
  • Loading branch information
countvonzero committed Aug 31, 2023
1 parent 1132af2 commit b846377
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 87 deletions.
57 changes: 2 additions & 55 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,14 @@

See [RELEASE](./RELEASE.md) for workflow instructions.

## v1.1.2
## v1.1.3

### Upgrade information

Legacy discovery protocol was removed in [#4836](https://github.com/spacemeshos/go-spacemesh/pull/4836).
Config option and flag `p2p-disable-legacy-discovery` is noop, and will be completely removed in future versions.

### Highlights

With [#4893](https://github.com/spacemeshos/go-spacemesh/pull/4893) Nodes are given more time to publish an ATX
Nodes still need to publish an ATX before the new PoET round starts (within 12h on mainnet) to make it into the
next PoET round, but if they miss that deadline they will now continue to publish an ATX to receive rewards for
the upcoming epoch and skip one after that.

### Features

* [#4845](https://github.com/spacemeshos/go-spacemesh/pull/4845) API to fetch opened connections.

> grpcurl -plaintext 127.0.0.1:9093 spacemesh.v1.AdminService.PeerInfoStream
```json
{
"id": "12D3KooWEcgADBR4zHirw7YAt6nUtCNhbPWkB2fnHAemnG5cGf2n",
"connections": [
{
"address": "/ip4/46.4.81.145/tcp/5001",
"uptime": "1359.186975782s",
"outbound": true
}
]
}
{
"id": "12D3KooWHK5m83sNj2eNMJMGAngcS9gBja27ho83t79Q2CD4iRjQ",
"connections": [
{
"address": "/ip4/34.86.244.124/tcp/5000",
"uptime": "1108.414456262s",
"outbound": true
}
],
"tags": [
"bootnode"
]
}
```

* [4795](https://github.com/spacemeshos/go-spacemesh/pull/4795) p2p: add ip4/ip6 blocklists

Doesn't affect direct peers. In order to disable:

```json
{
"p2p": {
"ip4-blocklist": [],
"ip6-blocklist": []
}
}
```

### Improvements

* [#4882](https://github.com/spacemeshos/go-spacemesh/pull/4882) Increase cache size and parametrize datastore.
* [#4887](https://github.com/spacemeshos/go-spacemesh/pull/4887) Fixed crashes on API call.
* [#4871](https://github.com/spacemeshos/go-spacemesh/pull/4871) Add jitter to spread out requests to get poet proof and submit challenge
* [#4939](https://github.com/spacemeshos/go-spacemesh/pull/4939) Make sure to fetch data from peers that are already connected.
22 changes: 7 additions & 15 deletions fetch/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -71,24 +72,15 @@ func (hpc *HashPeersCache) Add(hash types.Hash32, peer p2p.Peer) {
hpc.add(hash, peer)
}

// GetRandom returns a random peer for a given hash.
func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) (p2p.Peer, bool) {
// GetRandom returns a randomized list of peers for a given hash.
func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) []p2p.Peer {
hpc.mu.Lock()
defer hpc.mu.Unlock()

hashPeersMap, exists := hpc.getWithStats(hash, hint)
if !exists {
return p2p.NoPeer, false
}
n := rng.Intn(len(hashPeersMap)) + 1
i := 0
for peer := range hashPeersMap {
i++
if i == n {
return peer, true
}
}
return p2p.NoPeer, false
pm, _ := hpc.getWithStats(hash, hint)
peers := maps.Keys(pm)
rng.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
return peers
}

// RegisterPeerHashes registers provided peer for a list of hashes.
Expand Down
22 changes: 13 additions & 9 deletions fetch/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func TestAdd(t *testing.T) {

func TestGetRandom(t *testing.T) {
t.Parallel()
t.Run("no hash peers", func(t *testing.T) {
cache := NewHashPeersCache(10)
hash := types.RandomHash()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peers := cache.GetRandom(hash, datastore.TXDB, rng)
require.Empty(t, peers)
})
t.Run("1Hash3Peers", func(t *testing.T) {
cache := NewHashPeersCache(10)
hash := types.RandomHash()
Expand All @@ -94,9 +101,8 @@ func TestGetRandom(t *testing.T) {
}()
wg.Wait()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peer, exists := cache.GetRandom(hash, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Contains(t, []p2p.Peer{peer1, peer2, peer3}, peer)
peers := cache.GetRandom(hash, datastore.TXDB, rng)
require.ElementsMatch(t, []p2p.Peer{peer1, peer2, peer3}, peers)
})
t.Run("2Hashes1Peer", func(t *testing.T) {
cache := NewHashPeersCache(10)
Expand All @@ -115,12 +121,10 @@ func TestGetRandom(t *testing.T) {
}()
wg.Wait()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
randomPeer, exists := cache.GetRandom(hash1, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Equal(t, peer, randomPeer)
randomPeer, exists = cache.GetRandom(hash2, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Equal(t, peer, randomPeer)
randomPeers := cache.GetRandom(hash1, datastore.TXDB, rng)
require.Equal(t, []p2p.Peer{peer}, randomPeers)
randomPeers = cache.GetRandom(hash2, datastore.TXDB, rng)
require.Equal(t, []p2p.Peer{peer}, randomPeers)
})
}

Expand Down
21 changes: 13 additions & 8 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,18 +482,23 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
}
return nil
}

for _, req := range requests {
p, exists := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
if !exists {
p = randomPeer(peers)
target := p2p.NoPeer
hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
for _, p := range hashPeers {
if f.host.Connected(p) {
target = p
break
}
}

_, ok := peer2requests[p]
if target == p2p.NoPeer {
target = randomPeer(peers)
}
_, ok := peer2requests[target]
if !ok {
peer2requests[p] = []RequestMessage{req}
peer2requests[target] = []RequestMessage{req}
} else {
peer2requests[p] = append(peer2requests[p], req)
peer2requests[target] = append(peer2requests[target], req)
}
}

Expand Down
38 changes: 38 additions & 0 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,43 @@ func TestFetch_GetHash(t *testing.T) {
require.NotEqual(t, p1.completed, p2.completed)
}

func TestFetch_GetHashPeerNotConnected(t *testing.T) {
f := createFetch(t)
f.cfg.MaxRetriesForRequest = 0
f.cfg.MaxRetriesForPeer = 0
peer := p2p.Peer("buddy")
awol := p2p.Peer("notConnected")
f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer})
f.mh.EXPECT().ID().Return(p2p.Peer("self"))
f.mh.EXPECT().Connected(awol).Return(false)
hsh := types.RandomHash()
f.RegisterPeerHashes(awol, []types.Hash32{hsh})

res := ResponseMessage{
Hash: hsh,
Data: []byte("a"),
}
f.mHashS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error {
var rb RequestBatch
err := codec.Decode(req, &rb)
require.NoError(t, err)
resBatch := ResponseBatch{
ID: rb.ID,
Responses: []ResponseMessage{res},
}
bts, err := codec.Encode(&resBatch)
require.NoError(t, err)
okFunc(bts)
return nil
})

p, err := f.getHash(context.TODO(), hsh, datastore.BlockDB, goodReceiver)
require.NoError(t, err)
f.requestHashBatchFromPeers()
<-p.completed
}

func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
tt := []struct {
name string
Expand Down Expand Up @@ -164,6 +201,7 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
f.cfg.MaxRetriesForPeer = 0
peer := p2p.Peer("buddy")
f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer})
f.mh.EXPECT().Connected(peer).Return(true).AnyTimes()

hsh0 := types.RandomHash()
res0 := ResponseMessage{
Expand Down
1 change: 1 addition & 0 deletions fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type meshProvider interface {
type host interface {
ID() p2p.Peer
GetPeers() []p2p.Peer
Connected(p2p.Peer) bool
PeerProtocols(p2p.Peer) ([]protocol.ID, error)
Close() error
}
1 change: 1 addition & 0 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func TestFetch_getHashes(t *testing.T) {
f.cfg.BatchSize = 2
f.cfg.MaxRetriesForRequest = 0
f.cfg.MaxRetriesForPeer = 0
f.mh.EXPECT().Connected(gomock.Any()).Return(true).AnyTimes()
peers := []p2p.Peer{p2p.Peer("buddy 0"), p2p.Peer("buddy 1")}
f.mh.EXPECT().GetPeers().Return(peers)
f.mh.EXPECT().ID().Return(p2p.Peer("self")).AnyTimes()
Expand Down
14 changes: 14 additions & 0 deletions fetch/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions p2p/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (fh *Host) GetPeers() []Peer {
return fh.Host.Network().Peers()
}

func (fh *Host) Connected(p Peer) bool {
return fh.Host.Network().Connectedness(p) == network.Connected
}

// ConnectedPeerInfo retrieves a peer info object for the given peer.ID, if the
// given peer is not connected then nil is returned.
func (fh *Host) ConnectedPeerInfo(id peer.ID) *PeerInfo {
Expand Down

0 comments on commit b846377

Please sign in to comment.