diff --git a/crates/fuel-core/src/service/adapters/sync.rs b/crates/fuel-core/src/service/adapters/sync.rs index cc4bb4dbbba..62adc4dd120 100644 --- a/crates/fuel-core/src/service/adapters/sync.rs +++ b/crates/fuel-core/src/service/adapters/sync.rs @@ -66,6 +66,25 @@ impl PeerToPeerPort for P2PAdapter { } async fn get_transactions( + &self, + block_ids: Range, + ) -> anyhow::Result>>> { + let result = if let Some(service) = &self.service { + service.get_transactions(block_ids).await + } else { + Err(anyhow::anyhow!("No P2P service available")) + }; + match result { + Ok((peer_id, transactions)) => { + let peer_id: PeerId = peer_id.into(); + let transactions = peer_id.bind(transactions); + Ok(transactions) + } + Err(err) => Err(err), + } + } + + async fn get_transactions_from_peer( &self, range: SourcePeer>, ) -> anyhow::Result>> { diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index 3e0b61aa35c..37f8579cfe2 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -563,27 +563,47 @@ where } async fn get_transactions

( - peer_id: PeerId, range: Range, + peer_id: Option, p2p: &Arc

, -) -> Option> +) -> Option>> where P: PeerToPeerPort + Send + Sync + 'static, { - let range = peer_id.clone().bind(range); - let res = p2p - .get_transactions(range) - .await - .trace_err("Failed to get transactions"); - match res { - Ok(Some(transactions)) => Some(transactions), - _ => { - report_peer( - p2p, - Some(peer_id.clone()), - PeerReportReason::MissingTransactions, - ); - None + match peer_id { + Some(peer_id) => { + let source_peer = peer_id.clone().bind(range.clone()); + let Ok(Some(txs)) = p2p + .get_transactions_from_peer(source_peer) + .await + .trace_err("Failed to get transactions") + else { + report_peer( + p2p, + Some(peer_id.clone()), + PeerReportReason::MissingTransactions, + ); + return None; + }; + Some(SourcePeer { peer_id, data: txs }) + } + None => { + let Ok(SourcePeer { peer_id, data }) = p2p + .get_transactions(range.clone()) + .await + .trace_err("Failed to get transactions") + else { + return None; + }; + let Some(txs) = data else { + report_peer( + p2p, + Some(peer_id.clone()), + PeerReportReason::MissingTransactions, + ); + return None; + }; + Some(SourcePeer { peer_id, data: txs }) } } } @@ -646,20 +666,19 @@ where { let Batch { results: headers, - peer, range, + peer, } = headers; - let Some(peer) = peer else { - return SealedBlockBatch::new(None, range, vec![]) - }; - - let Some(transaction_data) = get_transactions(peer.clone(), range.clone(), p2p).await + let Some(SourcePeer { + peer_id, + data: transactions, + }) = get_transactions(range.clone(), peer.clone(), p2p).await else { - return Batch::new(Some(peer), range, vec![]) + return Batch::new(peer, range, vec![]) }; - let iter = headers.into_iter().zip(transaction_data.into_iter()); + let iter = headers.into_iter().zip(transactions.into_iter()); let mut blocks = vec![]; for (block_header, transactions) in iter { let SealedBlockHeader { @@ -676,13 +695,13 @@ where } else { report_peer( p2p, - Some(peer.clone()), + Some(peer_id.clone()), PeerReportReason::InvalidTransactions, ); break } } - Batch::new(Some(peer), range, blocks) + Batch::new(Some(peer_id), range, blocks) } #[tracing::instrument( diff --git a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs index e77e94ff4ad..3d74cf73ffa 100644 --- a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs +++ b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs @@ -47,7 +47,7 @@ impl PeerToPeerPort for PressurePeerToPeer { self.p2p.get_sealed_block_headers(block_height_range).await } - async fn get_transactions( + async fn get_transactions_from_peer( &self, block_ids: SourcePeer>, ) -> anyhow::Result>> { @@ -57,6 +57,19 @@ impl PeerToPeerPort for PressurePeerToPeer { self.counts.apply(|c| c.inc_blocks()); } self.counts.apply(|c| c.dec_transactions()); + self.p2p.get_transactions_from_peer(block_ids).await + } + + async fn get_transactions( + &self, + block_ids: Range, + ) -> anyhow::Result>>> { + self.counts.apply(|c| c.inc_transactions()); + tokio::time::sleep(self.durations[1]).await; + for _height in block_ids.clone() { + self.counts.apply(|c| c.inc_blocks()); + } + self.counts.apply(|c| c.dec_transactions()); self.p2p.get_transactions(block_ids).await } @@ -84,13 +97,14 @@ impl PressurePeerToPeer { Ok(headers) }) }); - mock.expect_get_transactions().returning(|block_ids| { - Box::pin(async move { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) - }) - }); + mock.expect_get_transactions_from_peer() + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); Self { p2p: mock, durations: delays, diff --git a/crates/services/sync/src/import/tests.rs b/crates/services/sync/src/import/tests.rs index 3b4fa7cf6b1..36c75d733f5 100644 --- a/crates/services/sync/src/import/tests.rs +++ b/crates/services/sync/src/import/tests.rs @@ -49,7 +49,7 @@ async fn test_import_0_to_5() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { Box::pin(async move { @@ -99,7 +99,7 @@ async fn test_import_3_to_5() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { Box::pin(async move { @@ -169,7 +169,7 @@ async fn test_import_0_to_499() { // Happens once for each batch let times = div_ceil(n, header_batch_size); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(times) .returning(|block_ids| { Box::pin(async move { @@ -219,7 +219,7 @@ async fn import__signature_fails_on_header_5_only() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { Box::pin(async move { @@ -304,7 +304,7 @@ async fn import__keep_data_asked_in_fail_ask_header_cases() { }) }); // No reask - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(3) .returning(|block_ids| { Box::pin(async move { @@ -377,7 +377,7 @@ async fn import__keep_data_asked_in_fail_ask_transactions_cases() { let mut seq = Sequence::new(); // Given // Fail to get transactions for block 4 - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .in_sequence(&mut seq) .returning(|_| { @@ -387,7 +387,7 @@ async fn import__keep_data_asked_in_fail_ask_transactions_cases() { }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(2) .in_sequence(&mut seq) .returning(|block_ids| { @@ -404,9 +404,12 @@ async fn import__keep_data_asked_in_fail_ask_transactions_cases() { .in_sequence(&mut seq) .returning(|block_ids| { Box::pin(async move { - let data = block_ids.data; + let data = block_ids; let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Ok(SourcePeer { + peer_id: random_peer(), + data: Some(v), + }) }) }); @@ -466,7 +469,7 @@ async fn import__signature_fails_on_header_4_only() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(0) .returning(|block_ids| { Box::pin(async move { @@ -575,7 +578,7 @@ async fn import__header_5_not_found() { }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { Box::pin(async move { @@ -617,7 +620,7 @@ async fn import__header_4_not_found() { Ok(headers) }) }); - p2p.expect_get_transactions().times(0); + p2p.expect_get_transactions_from_peer().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -661,7 +664,7 @@ async fn import__transactions_not_found() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|_| Box::pin(async move { Ok(None) })); @@ -708,7 +711,7 @@ async fn import__transactions_not_found_for_header_4() { }) }); let mut height = 3; - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(move |block_ids| { Box::pin(async move { @@ -765,12 +768,14 @@ async fn import__transactions_not_found_for_header_5() { Ok(headers) }) }); - p2p.expect_get_transactions().times(1).returning(move |_| { - Box::pin(async move { - let v = vec![Transactions::default()]; - Ok(Some(v)) - }) - }); + p2p.expect_get_transactions_from_peer() + .times(1) + .returning(move |_| { + Box::pin(async move { + let v = vec![Transactions::default()]; + Ok(Some(v)) + }) + }); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -799,7 +804,7 @@ async fn import__p2p_error() { .returning(|_| { Box::pin(async move { Err(anyhow::anyhow!("Some network error")) }) }); - p2p.expect_get_transactions().times(0); + p2p.expect_get_transactions_from_peer().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -843,9 +848,11 @@ async fn import__p2p_error_on_4_transactions() { Ok(headers) }) }); - p2p.expect_get_transactions().times(1).returning(|_| { - Box::pin(async move { Err(anyhow::anyhow!("Some network error")) }) - }); + p2p.expect_get_transactions_from_peer() + .times(1) + .returning(|_| { + Box::pin(async move { Err(anyhow::anyhow!("Some network error")) }) + }); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -895,7 +902,7 @@ async fn import__consensus_error_on_4() { Ok(headers) }) }); - p2p.expect_get_transactions().times(0); + p2p.expect_get_transactions_from_peer().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -945,7 +952,7 @@ async fn import__consensus_error_on_5() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { Box::pin(async move { @@ -997,7 +1004,7 @@ async fn import__execution_error_on_header_4() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { Box::pin(async move { @@ -1061,7 +1068,7 @@ async fn import__execution_error_on_header_5() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { Box::pin(async move { @@ -1158,7 +1165,7 @@ async fn import__can_work_in_two_loops() { Ok(headers) }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(2) .returning(|block_ids| { Box::pin(async move { @@ -1310,13 +1317,14 @@ async fn import__execution_error_on_header_4_when_awaits_for_1000000_blocks() { Ok(headers) }) }); - p2p.expect_get_transactions().returning(|block_ids| { - Box::pin(async move { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) - }) - }); + p2p.expect_get_transactions_from_peer() + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); let mut executor = MockBlockImporterPort::default(); executor @@ -1477,18 +1485,20 @@ impl PeerReportTestBuilder { let transactions = self.get_transactions.clone(); if let Some(t) = transactions { - p2p.expect_get_transactions().returning(move |_| { + p2p.expect_get_transactions_from_peer().returning(move |_| { let t = t.clone(); Box::pin(async move { Ok(t) }) }); } else { - p2p.expect_get_transactions().returning(|block_ids| { - Box::pin(async move { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) - }) - }); + p2p.expect_get_transactions_from_peer() + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = + data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); } let mut seq = mockall::Sequence::new(); @@ -1611,7 +1621,7 @@ impl DefaultMocks for MockPeerToPeerPort { }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(t.next().unwrap()) .returning(|block_ids| { Box::pin(async move { diff --git a/crates/services/sync/src/ports.rs b/crates/services/sync/src/ports.rs index 2d6e3f76eb3..483b4f10156 100644 --- a/crates/services/sync/src/ports.rs +++ b/crates/services/sync/src/ports.rs @@ -48,8 +48,14 @@ pub trait PeerToPeerPort { ) -> anyhow::Result>>>; /// Request transactions from the network for the given block range - /// and source peer. async fn get_transactions( + &self, + block_ids: Range, + ) -> anyhow::Result>>>; + + /// Request transactions from the network for the given block range + /// and source peer. + async fn get_transactions_from_peer( &self, block_ids: SourcePeer>, ) -> anyhow::Result>>; diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs index 91f3460a632..19748c4b9aa 100644 --- a/crates/services/sync/src/service/tests.rs +++ b/crates/services/sync/src/service/tests.rs @@ -46,13 +46,14 @@ async fn test_new_service() { Ok(headers) }) }); - p2p.expect_get_transactions().returning(|block_ids| { - Box::pin(async move { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) - }) - }); + p2p.expect_get_transactions_from_peer() + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); let mut importer = MockBlockImporterPort::default(); importer .expect_committed_height_stream()