Skip to content

Commit

Permalink
Use get transactions without peer_id when use headers from cache
Browse files Browse the repository at this point in the history
  • Loading branch information
AurelienFT committed Oct 21, 2024
1 parent ab3221c commit 99135e3
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 87 deletions.
19 changes: 19 additions & 0 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ impl PeerToPeerPort for P2PAdapter {
}

async fn get_transactions(
&self,
block_ids: Range<u32>,
) -> anyhow::Result<SourcePeer<Option<Vec<Transactions>>>> {
let result = if let Some(service) = &self.service {
service.get_transactions(block_ids).await
} else {
Err(anyhow::anyhow!("No P2P service available"))
};
match result {
Ok((peer_id, transactions)) => {
let peer_id: PeerId = peer_id.into();
let transactions = peer_id.bind(transactions);
Ok(transactions)
}
Err(err) => Err(err),
}
}

async fn get_transactions_from_peer(
&self,
range: SourcePeer<Range<u32>>,
) -> anyhow::Result<Option<Vec<Transactions>>> {
Expand Down
71 changes: 45 additions & 26 deletions crates/services/sync/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,27 +563,47 @@ where
}

async fn get_transactions<P>(
peer_id: PeerId,
range: Range<u32>,
peer_id: Option<PeerId>,
p2p: &Arc<P>,
) -> Option<Vec<Transactions>>
) -> Option<SourcePeer<Vec<Transactions>>>
where
P: PeerToPeerPort + Send + Sync + 'static,
{
let range = peer_id.clone().bind(range);
let res = p2p
.get_transactions(range)
.await
.trace_err("Failed to get transactions");
match res {
Ok(Some(transactions)) => Some(transactions),
_ => {
report_peer(
p2p,
Some(peer_id.clone()),
PeerReportReason::MissingTransactions,
);
None
match peer_id {
Some(peer_id) => {
let source_peer = peer_id.clone().bind(range.clone());
let Ok(Some(txs)) = p2p
.get_transactions_from_peer(source_peer)
.await
.trace_err("Failed to get transactions")
else {
report_peer(
p2p,
Some(peer_id.clone()),
PeerReportReason::MissingTransactions,
);
return None;
};
Some(SourcePeer { peer_id, data: txs })
}
None => {
let Ok(SourcePeer { peer_id, data }) = p2p
.get_transactions(range.clone())
.await
.trace_err("Failed to get transactions")
else {
return None;
};
let Some(txs) = data else {
report_peer(
p2p,
Some(peer_id.clone()),
PeerReportReason::MissingTransactions,
);
return None;
};
Some(SourcePeer { peer_id, data: txs })
}
}
}
Expand Down Expand Up @@ -646,20 +666,19 @@ where
{
let Batch {
results: headers,
peer,
range,
peer,
} = headers;

let Some(peer) = peer else {
return SealedBlockBatch::new(None, range, vec![])
};

let Some(transaction_data) = get_transactions(peer.clone(), range.clone(), p2p).await
let Some(SourcePeer {
peer_id,
data: transactions,
}) = get_transactions(range.clone(), peer.clone(), p2p).await
else {
return Batch::new(Some(peer), range, vec![])
return Batch::new(peer, range, vec![])
};

let iter = headers.into_iter().zip(transaction_data.into_iter());
let iter = headers.into_iter().zip(transactions.into_iter());
let mut blocks = vec![];
for (block_header, transactions) in iter {
let SealedBlockHeader {
Expand All @@ -676,13 +695,13 @@ where
} else {
report_peer(
p2p,
Some(peer.clone()),
Some(peer_id.clone()),
PeerReportReason::InvalidTransactions,
);
break
}
}
Batch::new(Some(peer), range, blocks)
Batch::new(Some(peer_id), range, blocks)
}

#[tracing::instrument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl PeerToPeerPort for PressurePeerToPeer {
self.p2p.get_sealed_block_headers(block_height_range).await
}

async fn get_transactions(
async fn get_transactions_from_peer(
&self,
block_ids: SourcePeer<Range<u32>>,
) -> anyhow::Result<Option<Vec<Transactions>>> {
Expand All @@ -57,6 +57,19 @@ impl PeerToPeerPort for PressurePeerToPeer {
self.counts.apply(|c| c.inc_blocks());
}
self.counts.apply(|c| c.dec_transactions());
self.p2p.get_transactions_from_peer(block_ids).await
}

async fn get_transactions(
&self,
block_ids: Range<u32>,
) -> anyhow::Result<SourcePeer<Option<Vec<Transactions>>>> {
self.counts.apply(|c| c.inc_transactions());
tokio::time::sleep(self.durations[1]).await;
for _height in block_ids.clone() {
self.counts.apply(|c| c.inc_blocks());
}
self.counts.apply(|c| c.dec_transactions());
self.p2p.get_transactions(block_ids).await
}

Expand Down Expand Up @@ -84,13 +97,14 @@ impl PressurePeerToPeer {
Ok(headers)
})
});
mock.expect_get_transactions().returning(|block_ids| {
Box::pin(async move {
let data = block_ids.data;
let v = data.into_iter().map(|_| Transactions::default()).collect();
Ok(Some(v))
})
});
mock.expect_get_transactions_from_peer()
.returning(|block_ids| {
Box::pin(async move {
let data = block_ids.data;
let v = data.into_iter().map(|_| Transactions::default()).collect();
Ok(Some(v))
})
});
Self {
p2p: mock,
durations: delays,
Expand Down
Loading

0 comments on commit 99135e3

Please sign in to comment.