-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix sending of WantBlocks messages and tracking of peerWants #1019
base: master
Are you sure you want to change the base?
Changes from all commits
f659c7e
3391473
cd1eebe
244f744
605de84
04eefd1
b779058
d7ac31f
644d429
dce6baa
afbfadb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -130,16 +130,15 @@ | |
proc sendWantHave( | ||
b: BlockExcEngine, | ||
addresses: seq[BlockAddress], | ||
excluded: seq[BlockExcPeerCtx], | ||
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = | ||
for p in peers: | ||
if p notin excluded: | ||
let toAsk = addresses.filterIt(it notin p.peerHave) | ||
trace "Sending wantHave request", toAsk, peer = p.id | ||
await b.network.request.sendWantList( | ||
p.id, | ||
toAsk, | ||
wantType = WantType.WantHave) | ||
let toAsk = addresses.filterIt(it notin p.peerHave) | ||
trace "Sending wantHave request", toAsk, peer = p.id | ||
await b.network.request.sendWantList( | ||
p.id, | ||
toAsk, | ||
wantType = WantType.WantHave) | ||
codex_block_exchange_want_have_lists_sent.inc() | ||
|
||
proc sendWantBlock( | ||
b: BlockExcEngine, | ||
|
@@ -150,6 +149,7 @@ | |
blockPeer.id, | ||
addresses, | ||
wantType = WantType.WantBlock) # we want this remote to send us a block | ||
codex_block_exchange_want_block_lists_sent.inc() | ||
|
||
proc monitorBlockHandle( | ||
b: BlockExcEngine, | ||
|
@@ -175,33 +175,27 @@ | |
await b.network.switch.disconnect(peerId) | ||
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) | ||
|
||
proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = | ||
return peers[hash(address) mod peers.len] | ||
|
||
proc requestBlock*( | ||
b: BlockExcEngine, | ||
address: BlockAddress, | ||
): Future[?!Block] {.async.} = | ||
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) | ||
|
||
if not b.pendingBlocks.isInFlight(address): | ||
let peers = b.peers.selectCheapest(address) | ||
if peers.len == 0: | ||
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) | ||
let peers = b.peers.getPeersForBlock(address) | ||
|
||
let maybePeer = | ||
if peers.len > 0: | ||
peers[hash(address) mod peers.len].some | ||
elif b.peers.len > 0: | ||
toSeq(b.peers)[hash(address) mod b.peers.len].some | ||
else: | ||
BlockExcPeerCtx.none | ||
|
||
if peer =? maybePeer: | ||
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) | ||
if peers.with.len == 0: | ||
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) | ||
else: | ||
let selected = pickPseudoRandom(address, peers.with) | ||
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id) | ||
b.pendingBlocks.setInFlight(address) | ||
# TODO: Send more block addresses if at all sensible. | ||
await b.sendWantBlock(@[address], peer) | ||
codex_block_exchange_want_block_lists_sent.inc() | ||
await b.sendWantHave(@[address], @[peer], toSeq(b.peers)) | ||
codex_block_exchange_want_have_lists_sent.inc() | ||
await b.sendWantBlock(@[address], selected) | ||
|
||
await b.sendWantHave(@[address], peers.without) | ||
|
||
# Don't let timeouts bubble up. We can't be too broad here or we break | ||
# cancellations. | ||
|
@@ -246,7 +240,7 @@ | |
) | ||
|
||
if wantCids.len > 0: | ||
trace "Peer has blocks in our wantList", peer, wantCount = wantCids.len | ||
trace "Peer has blocks in our wantList", peer, wants = wantCids | ||
await b.sendWantBlock(wantCids, peerCtx) | ||
|
||
# if none of the connected peers report our wants in their have list, | ||
|
@@ -276,7 +270,7 @@ | |
|
||
proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = | ||
## Tells neighboring peers that we're no longer interested in a block. | ||
trace "Sending block request cancellations to peers", addrs = addrs.len | ||
trace "Sending block request cancellations to peers", addrs, peers = b.peers.mapIt($it.id) | ||
|
||
let failed = (await allFinished( | ||
b.peers.mapIt( | ||
|
@@ -342,13 +336,13 @@ | |
b: BlockExcEngine, | ||
peer: PeerId, | ||
blocksDelivery: seq[BlockDelivery]) {.async.} = | ||
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt($it.address)).join(",") | ||
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) | ||
|
||
var validatedBlocksDelivery: seq[BlockDelivery] | ||
for bd in blocksDelivery: | ||
logScope: | ||
peer = peer | ||
address = bd.address | ||
peer = peer | ||
address = bd.address | ||
|
||
if err =? b.validateBlockDelivery(bd).errorOption: | ||
warn "Block validation failed", msg = err.msg | ||
|
@@ -390,11 +384,13 @@ | |
wantList: WantList) {.async.} = | ||
let | ||
peerCtx = b.peers.get(peer) | ||
if isNil(peerCtx): | ||
|
||
if peerCtx.isNil: | ||
return | ||
|
||
var | ||
presence: seq[BlockPresence] | ||
schedulePeer = false | ||
|
||
for e in wantList.entries: | ||
let | ||
|
@@ -405,32 +401,35 @@ | |
address = e.address | ||
wantType = $e.wantType | ||
|
||
if idx < 0: # updating entry | ||
if idx < 0: # Adding new entry to peer wants | ||
let | ||
have = await e.address in b.localStore | ||
price = @( | ||
b.pricing.get(Pricing(price: 0.u256)) | ||
.price.toBytesBE) | ||
|
||
if e.wantType == WantType.WantHave: | ||
codex_block_exchange_want_have_lists_received.inc() | ||
if have: | ||
presence.add( | ||
BlockPresence( | ||
address: e.address, | ||
`type`: BlockPresenceType.Have, | ||
price: price)) | ||
else: | ||
if e.sendDontHave: | ||
presence.add( | ||
BlockPresence( | ||
address: e.address, | ||
`type`: BlockPresenceType.DontHave, | ||
price: price)) | ||
peerCtx.peerWants.add(e) | ||
|
||
if not have and e.sendDontHave: | ||
presence.add( | ||
BlockPresence( | ||
address: e.address, | ||
`type`: BlockPresenceType.DontHave, | ||
price: price)) | ||
elif have and e.wantType == WantType.WantHave: | ||
presence.add( | ||
BlockPresence( | ||
address: e.address, | ||
`type`: BlockPresenceType.Have, | ||
price: price)) | ||
codex_block_exchange_want_have_lists_received.inc() | ||
elif e.wantType == WantType.WantBlock: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a good reason why I separated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For more context, this isn't actually true, we do track There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I see the problem with the scheduling of tasks for every wantList. Makes perfect sense and I'll fix that right now. Regarding the tracking of wantHaves: Responding immediately is good but not enough. We're expected to keep the want in the peerCtx. If we encounter the block later, we should still react to the want with a presence message. The tests in I would like to take a closer look at exactly how the stored wants will trigger messages when blocks are received. But I'm trying to keep the scope of this PR small. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I see. Then, what we need to do is to add these to the For a later PR, we should take a closer look at how these lists are kept, for example, the Some lists, like the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For more context, what needs to happen is that when adding a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes: Upgrading the peerWants to a capped, prioritized list is definitely needed. "what needs to happen is that when adding a DontHave to the presence seq, we also add it to peerCtx.peerWants list." "We only need to remember the DontHaves, because the remote should keep track of the Haves we send back."
I think bitswap adds the want to the peerCtx in both cases. For the record, e.sendDontHave is has always been false up to this point except for some tests and that's a good thing because the peerContext peerHave proc doesn't consider the have-flag currently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yep, thats what I mean. |
||
peerCtx.peerWants.add(e) | ||
schedulePeer = true | ||
codex_block_exchange_want_block_lists_received.inc() | ||
else: | ||
else: # Updating existing entry in peer wants | ||
# peer doesn't want this block anymore | ||
if e.cancel: | ||
peerCtx.peerWants.del(idx) | ||
|
@@ -443,8 +442,9 @@ | |
trace "Sending presence to remote", items = presence.mapIt($it).join(",") | ||
await b.network.request.sendPresence(peer, presence) | ||
|
||
if not b.scheduleTask(peerCtx): | ||
warn "Unable to schedule task for peer", peer | ||
if schedulePeer: | ||
if not b.scheduleTask(peerCtx): | ||
warn "Unable to schedule task for peer", peer | ||
|
||
proc accountHandler*( | ||
engine: BlockExcEngine, | ||
|
@@ -555,7 +555,7 @@ | |
updateInFlight(failedAddresses, false) | ||
|
||
if blocksDelivery.len > 0: | ||
trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt($it.address)).join(",") | ||
trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt(it.address)) | ||
await b.network.request.sendBlocksDelivery( | ||
task.id, | ||
blocksDelivery | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, why are we sending a
WantHave
to the peers that we know don't have the block we need?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Peers are only reporting their haves if asked. We have to send a wantHave before we know if they have the block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're right.