Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sending of WantBlocks messages and tracking of peerWants #1019

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
100 changes: 50 additions & 50 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,15 @@
proc sendWantHave(
b: BlockExcEngine,
addresses: seq[BlockAddress],
excluded: seq[BlockExcPeerCtx],
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
for p in peers:
if p notin excluded:
let toAsk = addresses.filterIt(it notin p.peerHave)
trace "Sending wantHave request", toAsk, peer = p.id
await b.network.request.sendWantList(
p.id,
toAsk,
wantType = WantType.WantHave)
let toAsk = addresses.filterIt(it notin p.peerHave)
trace "Sending wantHave request", toAsk, peer = p.id
await b.network.request.sendWantList(
p.id,
toAsk,
wantType = WantType.WantHave)
codex_block_exchange_want_have_lists_sent.inc()

proc sendWantBlock(
b: BlockExcEngine,
Expand All @@ -150,6 +149,7 @@
blockPeer.id,
addresses,
wantType = WantType.WantBlock) # we want this remote to send us a block
codex_block_exchange_want_block_lists_sent.inc()

proc monitorBlockHandle(
b: BlockExcEngine,
Expand All @@ -175,33 +175,27 @@
await b.network.switch.disconnect(peerId)
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])

proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
return peers[hash(address) mod peers.len]

Check warning on line 180 in codex/blockexchange/engine/engine.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/engine.nim#L178-L180

Added lines #L178 - L180 were not covered by tests
proc requestBlock*(
b: BlockExcEngine,
address: BlockAddress,
): Future[?!Block] {.async.} =
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)

if not b.pendingBlocks.isInFlight(address):
let peers = b.peers.selectCheapest(address)
if peers.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
let peers = b.peers.getPeersForBlock(address)

let maybePeer =
if peers.len > 0:
peers[hash(address) mod peers.len].some
elif b.peers.len > 0:
toSeq(b.peers)[hash(address) mod b.peers.len].some
else:
BlockExcPeerCtx.none

if peer =? maybePeer:
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)

Check warning on line 194 in codex/blockexchange/engine/engine.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/engine.nim#L194

Added line #L194 was not covered by tests
b.pendingBlocks.setInFlight(address)
# TODO: Send more block addresses if at all sensible.
await b.sendWantBlock(@[address], peer)
codex_block_exchange_want_block_lists_sent.inc()
await b.sendWantHave(@[address], @[peer], toSeq(b.peers))
codex_block_exchange_want_have_lists_sent.inc()
await b.sendWantBlock(@[address], selected)

Check warning on line 196 in codex/blockexchange/engine/engine.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/engine.nim#L196

Added line #L196 was not covered by tests

await b.sendWantHave(@[address], peers.without)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, why are we sending a WantHave to the peers that we know don't have the block we need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peers are only reporting their haves if asked. We have to send a wantHave before we know if they have the block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right.


# Don't let timeouts bubble up. We can't be too broad here or we break
# cancellations.
Expand Down Expand Up @@ -246,7 +240,7 @@
)

if wantCids.len > 0:
trace "Peer has blocks in our wantList", peer, wantCount = wantCids.len
trace "Peer has blocks in our wantList", peer, wants = wantCids
await b.sendWantBlock(wantCids, peerCtx)

# if none of the connected peers report our wants in their have list,
Expand Down Expand Up @@ -276,7 +270,7 @@

proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
## Tells neighboring peers that we're no longer interested in a block.
trace "Sending block request cancellations to peers", addrs = addrs.len
trace "Sending block request cancellations to peers", addrs, peers = b.peers.mapIt($it.id)

let failed = (await allFinished(
b.peers.mapIt(
Expand Down Expand Up @@ -342,13 +336,13 @@
b: BlockExcEngine,
peer: PeerId,
blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt($it.address)).join(",")
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address))

var validatedBlocksDelivery: seq[BlockDelivery]
for bd in blocksDelivery:
logScope:
peer = peer
address = bd.address
peer = peer
address = bd.address

if err =? b.validateBlockDelivery(bd).errorOption:
warn "Block validation failed", msg = err.msg
Expand Down Expand Up @@ -390,11 +384,13 @@
wantList: WantList) {.async.} =
let
peerCtx = b.peers.get(peer)
if isNil(peerCtx):

if peerCtx.isNil:
return

var
presence: seq[BlockPresence]
schedulePeer = false

for e in wantList.entries:
let
Expand All @@ -405,32 +401,35 @@
address = e.address
wantType = $e.wantType

if idx < 0: # updating entry
if idx < 0: # Adding new entry to peer wants
let
have = await e.address in b.localStore
price = @(
b.pricing.get(Pricing(price: 0.u256))
.price.toBytesBE)

if e.wantType == WantType.WantHave:
codex_block_exchange_want_have_lists_received.inc()
if have:
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.Have,
price: price))
else:
if e.sendDontHave:
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.DontHave,
price: price))
peerCtx.peerWants.add(e)

if not have and e.sendDontHave:
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.DontHave,
price: price))
elif have and e.wantType == WantType.WantHave:
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.Have,
price: price))
codex_block_exchange_want_have_lists_received.inc()
elif e.wantType == WantType.WantBlock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a good reason why I separated WantBlock into it's own branch. We used to schedule a task for every single WantList update, but obviously this isn't necessary and worst, it was severely affecting performance. We should not attempt to schedule tasks on WantHave, since this is just a presence update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We track all peerWants instead of only the wantBlocks. This is according to spec, and solves an issue in the case where a block is received/detected after the want-list message.

For more context, this isn't actually true, we do track WantHave messages, but we respond to them immediately with a presence message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see the problem with the scheduling of tasks for every wantList. Makes perfect sense and I'll fix that right now.

Regarding the tracking of wantHaves: Responding immediately is good but not enough. We're expected to keep the want in the peerCtx. If we encounter the block later, we should still react to the want with a presence message. The tests in NetworkStore - multiple nodes in testblockexc.nim actually check this behavior perfectly. They were just previously passing because 1) we were sending wantBlocks when we shouldn't and 2) we were keeping the wantBlocks only. So fixing the send of wantBlock caused this correct test to reveal the other issue.

I would like to take a closer look at exactly how the stored wants will trigger messages when blocks are received. But I'm trying to keep the scope of this PR small.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. Then, what we need to do is to add these to the BlockExcPeerCtx.peerWants list in addition to responding to the request.


For a later PR, we should take a closer look at how these lists are kept, for example, the peerWants is a simple seq, which doesn't have any sort of caps, nor ordering.

Some lists, like the peerWants, in addition to be capped, should also allow for ordering, for example WantListEntry has a priority field, which should be respected.

Copy link
Contributor

@dryajov dryajov Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more context, what needs to happen is that when adding a DontHave to the presence seq, we also add it to peerCtx.peerWants list. We only need to remember the DontHaves, because the remote should keep track of the Haves we send back.

Copy link
Contributor Author

@benbierens benbierens Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes: Upgrading the peerWants to a capped, prioritized list is definitely needed.

"what needs to happen is that when adding a DontHave to the presence seq, we also add it to peerCtx.peerWants list."
In the PR, yes we are adding it to the peerWants regardless of Have/DontHave

"We only need to remember the DontHaves, because the remote should keep track of the Haves we send back."
I understand this to mean:

  • If we Have, don't add the want to the peerCtx, only send presence.
  • If we dont Have, add want to peerCtx, and send dontHave presence if asked. (e.sendDontHave)

I think bitswap adds the want to the peerCtx in both cases.

For the record, e.sendDontHave is has always been false up to this point except for some tests and that's a good thing because the peerContext peerHave proc doesn't consider the have-flag currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we Have, don't add the want to the peerCtx, only send presence.
If we dont Have, add want to peerCtx, and send dontHave presence if asked. (e.sendDontHave)

Yep, thats what I mean.

peerCtx.peerWants.add(e)
schedulePeer = true
codex_block_exchange_want_block_lists_received.inc()
else:
else: # Updating existing entry in peer wants
# peer doesn't want this block anymore
if e.cancel:
peerCtx.peerWants.del(idx)
Expand All @@ -443,8 +442,9 @@
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
await b.network.request.sendPresence(peer, presence)

if not b.scheduleTask(peerCtx):
warn "Unable to schedule task for peer", peer
if schedulePeer:
if not b.scheduleTask(peerCtx):
warn "Unable to schedule task for peer", peer

proc accountHandler*(
engine: BlockExcEngine,
Expand Down Expand Up @@ -555,7 +555,7 @@
updateInFlight(failedAddresses, false)

if blocksDelivery.len > 0:
trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt($it.address)).join(",")
trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt(it.address))
await b.network.request.sendBlocksDelivery(
task.id,
blocksDelivery
Expand Down
35 changes: 10 additions & 25 deletions codex/blockexchange/peers/peerctxstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ logScope:
type
PeerCtxStore* = ref object of RootObj
peers*: OrderedTable[PeerId, BlockExcPeerCtx]
PeersForBlock* = object of RootObj
with*: seq[BlockExcPeerCtx]
without*: seq[BlockExcPeerCtx]

iterator items*(self: PeerCtxStore): BlockExcPeerCtx =
for p in self.peers.values:
Expand Down Expand Up @@ -70,32 +73,14 @@ func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx]
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) )

func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
# assume that the price for all leaves in a tree is the same
let rootAddress = BlockAddress(leaf: false, cid: address.cidOrTreeCid)
var peers = self.peersHave(rootAddress)

func cmp(a, b: BlockExcPeerCtx): int =
var
priceA = 0.u256
priceB = 0.u256

a.blocks.withValue(rootAddress, precense):
priceA = precense[].price

b.blocks.withValue(rootAddress, precense):
priceB = precense[].price

if priceA == priceB:
0
elif priceA > priceB:
1
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res = PeersForBlock()
for peer in self:
if peer.peerHave.anyIt( it == address ):
res.with.add(peer)
else:
-1

peers.sort(cmp)
trace "Selected cheapest peers", peers = peers.len
return peers
res.without.add(peer)
res

proc new*(T: type PeerCtxStore): PeerCtxStore =
## create new instance of a peer context store
Expand Down
37 changes: 16 additions & 21 deletions tests/codex/blockexchange/testpeerctxstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,6 @@ checksuite "Peer Context Store Peer Selection":
check peerCtxs[0] in peers
check peerCtxs[5] in peers

test "Should select cheapest peers for Cid":
peerCtxs[0].blocks = collect(initTable):
for i, a in addresses:
{ a: Presence(address: a, price: (5 + i).u256) }

peerCtxs[5].blocks = collect(initTable):
for i, a in addresses:
{ a: Presence(address: a, price: (2 + i).u256) }

peerCtxs[9].blocks = collect(initTable):
for i, a in addresses:
{ a: Presence(address: a, price: i.u256) }

let
peers = store.selectCheapest(addresses[0])

check peers.len == 3
check peers[0] == peerCtxs[9]
check peers[1] == peerCtxs[5]
check peers[2] == peerCtxs[0]

test "Should select peers that want Cid":
let
entries = addresses.mapIt(
Expand All @@ -109,3 +88,19 @@ checksuite "Peer Context Store Peer Selection":
check peers.len == 2
check peerCtxs[0] in peers
check peerCtxs[5] in peers

test "Should return peers with and without block":
let address = addresses[2]

peerCtxs[1].blocks[address] = Presence(address: address, price: 0.u256)
peerCtxs[2].blocks[address] = Presence(address: address, price: 0.u256)

let peers = store.getPeersForBlock(address)

for i, pc in peerCtxs:
if i == 1 or i == 2:
check pc in peers.with
check pc notin peers.without
else:
check pc notin peers.with
check pc in peers.without
2 changes: 1 addition & 1 deletion tests/integration/testsales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ multinodesuite "Sales":
clients: CodexConfigs.init(nodes=1).some,
providers: CodexConfigs.init(nodes=1).some,
)

var host: CodexClient
var client: CodexClient

Expand Down
Loading