diff --git a/CHANGELOG.md b/CHANGELOG.md index 40dd32ecc4d..d6dfcb33e77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. - [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit. - [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer. +- [2361](https://github.com/FuelLabs/fuel-core/pull/2361): Add caches to the sync service to not reask for data it already fetched from the network. - [2327](https://github.com/FuelLabs/fuel-core/pull/2327): Add more services tests and more checks of the pool. Also add an high level documentation for users of the pool and contributors. - [2416](https://github.com/FuelLabs/fuel-core/issues/2416): Define the `GasPriceServiceV1` task. - [2033](https://github.com/FuelLabs/fuel-core/pull/2033): Remove `Option` in favor of `BlockHeightQuery` where applicable. diff --git a/benches/benches/import.rs b/benches/benches/import.rs index c8402b37e51..268c355ba11 100644 --- a/benches/benches/import.rs +++ b/benches/benches/import.rs @@ -17,7 +17,7 @@ use fuel_core_sync::state::State; use std::time::Duration; use tokio::runtime::Runtime; -async fn execute_import(import: PressureImport, shutdown: &mut StateWatcher) { +async fn execute_import(mut import: PressureImport, shutdown: &mut StateWatcher) { import.import(shutdown).await.unwrap(); } diff --git a/crates/fuel-core/src/service/adapters/sync.rs b/crates/fuel-core/src/service/adapters/sync.rs index cc4bb4dbbba..62adc4dd120 100644 --- a/crates/fuel-core/src/service/adapters/sync.rs +++ b/crates/fuel-core/src/service/adapters/sync.rs @@ -66,6 +66,25 @@ impl PeerToPeerPort for P2PAdapter { } async fn get_transactions( + &self, + block_ids: Range, + ) -> anyhow::Result>>> { + let result = if let Some(service) = &self.service { + service.get_transactions(block_ids).await + } else { + Err(anyhow::anyhow!("No P2P service available")) + }; + match result { + Ok((peer_id, transactions)) => { + let peer_id: PeerId = peer_id.into(); + let transactions = peer_id.bind(transactions); + Ok(transactions) + } + Err(err) => Err(err), + } + } + + async fn get_transactions_from_peer( &self, range: SourcePeer>, ) -> anyhow::Result>> { diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index a0aebc6dc8f..f3d93359183 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -2,6 +2,10 @@ //! This module contains the import task which is responsible for //! importing blocks from the network into the local blockchain. +use cache::{ + Cache, + CachedDataBatch, +}; use fuel_core_services::{ SharedMutex, StateWatcher, @@ -28,6 +32,7 @@ use futures::{ }; use std::{ future::Future, + num::NonZeroU32, ops::{ Range, RangeInclusive, @@ -54,6 +59,8 @@ use crate::{ state::State, }; +mod cache; + #[cfg(any(test, feature = "benchmarking"))] /// Accessories for testing the sync. Available only when compiling under test /// or benchmarking. @@ -98,6 +105,17 @@ pub struct Import { executor: Arc, /// Consensus port. consensus: Arc, + /// A cache of already validated header or blocks. + cache: Cache, +} + +/// The data that is fetched either in the network or in the cache for a range of headers or blocks. +#[derive(Debug, Clone)] +enum BlockHeaderData { + /// The headers (or full blocks) have been fetched and checked. + Cached(CachedDataBatch), + /// The headers has just been fetched from the network. + Fetched(Batch), } impl Import { @@ -118,6 +136,7 @@ impl Import { p2p, executor, consensus, + cache: Cache::new(), } } @@ -127,7 +146,7 @@ impl Import { } } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] struct Batch { peer: Option, range: Range, @@ -159,13 +178,13 @@ where { #[tracing::instrument(skip_all)] /// Execute imports until a shutdown is requested. - pub async fn import(&self, shutdown: &mut StateWatcher) -> anyhow::Result { + pub async fn import(&mut self, shutdown: &mut StateWatcher) -> anyhow::Result { self.import_inner(shutdown).await?; Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await) } - async fn import_inner(&self, shutdown: &StateWatcher) -> anyhow::Result<()> { + async fn import_inner(&mut self, shutdown: &StateWatcher) -> anyhow::Result<()> { // If there is a range to process, launch the stream. if let Some(range) = self.state.apply(|s| s.process_range()) { // Launch the stream to import the range. @@ -199,8 +218,13 @@ where params, p2p, consensus, + cache, .. } = &self; + let batch_size = u32::try_from(params.header_batch_size) + .expect("Header batch size must be less u32::MAX"); + let batch_size = + NonZeroU32::new(batch_size).expect("Header batch size must be non-zero"); let (batch_sender, batch_receiver) = mpsc::channel(1); @@ -208,11 +232,17 @@ where let params = *params; let p2p = p2p.clone(); let consensus = consensus.clone(); + let cache = cache.clone(); let block_stream_buffer_size = params.block_stream_buffer_size; let mut shutdown_signal = shutdown.clone(); async move { - let block_stream = - get_block_stream(range.clone(), params, p2p, consensus); + let block_stream = get_block_stream( + range.clone(), + batch_size, + p2p, + consensus, + cache.clone(), + ); let shutdown_future = { let mut s = shutdown_signal.clone(); @@ -279,7 +309,7 @@ where /// If an error occurs, the preceding blocks still be processed /// and the error will be returned. async fn launch_stream( - &self, + &mut self, range: RangeInclusive, shutdown: &StateWatcher, ) -> usize { @@ -294,6 +324,7 @@ where self.fetch_batches_task(range, shutdown); let result = tokio_stream::wrappers::ReceiverStream::new(batch_receiver) .then(|batch| { + let mut cache = self.cache.clone(); async move { let Batch { peer, @@ -304,12 +335,14 @@ where let mut done = vec![]; let mut shutdown = shutdown.clone(); for sealed_block in results { + let height = *sealed_block.entity.header().height(); let res = tokio::select! { biased; _ = shutdown.while_started() => { break; }, res = execute_and_commit(executor.as_ref(), state, sealed_block) => { + cache.remove_element(&height); res }, }; @@ -327,7 +360,7 @@ where }; } - let batch = Batch::new(peer.clone(), range, done); + let batch = Batch::new(peer.clone(), range.clone(), done); if !batch.is_err() { report_peer(p2p, peer, PeerReportReason::SuccessfulBlockImport); @@ -361,31 +394,63 @@ fn get_block_stream< C: ConsensusPort + Send + Sync + 'static, >( range: RangeInclusive, - params: Config, + header_batch_size: NonZeroU32, p2p: Arc

, consensus: Arc, + cache: Cache, ) -> impl Stream> { - let header_stream = get_header_batch_stream(range.clone(), params, p2p.clone()); - header_stream + cache + .get_chunks(range.clone(), header_batch_size) + .map({ + let p2p = p2p.clone(); + move |cached_data_batch| { + let p2p = p2p.clone(); + async move { + if let CachedDataBatch::None(range) = cached_data_batch { + BlockHeaderData::Fetched(get_headers_batch(range, &p2p).await) + } else { + BlockHeaderData::Cached(cached_data_batch) + } + } + } + }) .map({ let p2p = p2p.clone(); let consensus = consensus.clone(); + let cache = cache.clone(); move |header_batch| { let p2p = p2p.clone(); let consensus = consensus.clone(); + let mut cache = cache.clone(); async move { - let Batch { - peer, - range, - results, - } = header_batch.await; - let checked_headers = results - .into_iter() - .take_while(|header| { - check_sealed_header(header, peer.clone(), &p2p, &consensus) - }) - .collect::>(); - Batch::new(peer, range, checked_headers) + match header_batch.await { + BlockHeaderData::Cached(cached_data) => { + BlockHeaderData::Cached(cached_data) + } + BlockHeaderData::Fetched(fetched_batch) => { + let Batch { + peer, + range, + results, + } = fetched_batch; + let checked_headers = results + .into_iter() + .take_while(|header| { + check_sealed_header( + header, + peer.clone(), + &p2p, + &consensus, + ) + }) + .collect::>(); + let batch = Batch::new(peer, range.clone(), checked_headers); + if !batch.is_err() { + cache.insert_headers(batch.clone()); + } + BlockHeaderData::Fetched(batch) + } + } } } }) @@ -395,24 +460,40 @@ fn get_block_stream< move |headers| { let p2p = p2p.clone(); let consensus = consensus.clone(); + let mut cache = cache.clone(); async move { - let Batch { - peer, - range, - results, - } = headers.await; - if results.is_empty() { - SealedBlockBatch::new(peer, range, vec![]) - } else { - await_da_height( - results - .last() - .expect("We checked headers are not empty above"), - &consensus, - ) - .await; - let headers = SealedHeaderBatch::new(peer, range, results); - get_blocks(&p2p, headers).await + match headers.await { + BlockHeaderData::Cached(CachedDataBatch::Blocks(batch)) => batch, + BlockHeaderData::Cached(CachedDataBatch::Headers(batch)) + | BlockHeaderData::Fetched(batch) => { + let Batch { + peer, + range, + results, + } = batch; + if results.is_empty() { + SealedBlockBatch::new(peer, range, vec![]) + } else { + await_da_height( + results + .last() + .expect("We checked headers are not empty above"), + &consensus, + ) + .await; + let headers = + SealedHeaderBatch::new(peer, range.clone(), results); + let batch = get_blocks(&p2p, headers).await; + if !batch.is_err() { + cache.insert_blocks(batch.clone()); + } + batch + } + } + BlockHeaderData::Cached(CachedDataBatch::None(_)) => { + tracing::error!("Cached data batch should never be created outside of the caching algorithm."); + Batch::new(None, 0..1, vec![]) + } } } .instrument(tracing::debug_span!("consensus_and_transactions")) @@ -421,34 +502,6 @@ fn get_block_stream< }) } -fn get_header_batch_stream( - range: RangeInclusive, - params: Config, - p2p: Arc

, -) -> impl Stream> { - let Config { - header_batch_size, .. - } = params; - let ranges = range_chunks(range, header_batch_size); - futures::stream::iter(ranges).map(move |range| { - let p2p = p2p.clone(); - async move { get_headers_batch(range, &p2p).await } - }) -} - -fn range_chunks( - range: RangeInclusive, - chunk_size: usize, -) -> impl Iterator> { - let end = range.end().saturating_add(1); - let chunk_size_u32 = - u32::try_from(chunk_size).expect("The size of the chunk can't exceed `u32`"); - range.step_by(chunk_size).map(move |chunk_start| { - let block_end = (chunk_start.saturating_add(chunk_size_u32)).min(end); - chunk_start..block_end - }) -} - fn check_sealed_header< P: PeerToPeerPort + Send + Sync + 'static, C: ConsensusPort + Send + Sync + 'static, @@ -516,27 +569,47 @@ where } async fn get_transactions

( - peer_id: PeerId, range: Range, + peer_id: Option, p2p: &Arc

, -) -> Option> +) -> Option>> where P: PeerToPeerPort + Send + Sync + 'static, { - let range = peer_id.clone().bind(range); - let res = p2p - .get_transactions(range) - .await - .trace_err("Failed to get transactions"); - match res { - Ok(Some(transactions)) => Some(transactions), - _ => { - report_peer( - p2p, - Some(peer_id.clone()), - PeerReportReason::MissingTransactions, - ); - None + match peer_id { + Some(peer_id) => { + let source_peer = peer_id.clone().bind(range.clone()); + let Ok(Some(txs)) = p2p + .get_transactions_from_peer(source_peer) + .await + .trace_err("Failed to get transactions") + else { + report_peer( + p2p, + Some(peer_id.clone()), + PeerReportReason::MissingTransactions, + ); + return None; + }; + Some(SourcePeer { peer_id, data: txs }) + } + None => { + let Ok(SourcePeer { peer_id, data }) = p2p + .get_transactions(range.clone()) + .await + .trace_err("Failed to get transactions") + else { + return None; + }; + let Some(txs) = data else { + report_peer( + p2p, + Some(peer_id.clone()), + PeerReportReason::MissingTransactions, + ); + return None; + }; + Some(SourcePeer { peer_id, data: txs }) } } } @@ -599,20 +672,19 @@ where { let Batch { results: headers, - peer, range, + peer, } = headers; - let Some(peer) = peer else { - return SealedBlockBatch::new(None, range, vec![]) - }; - - let Some(transaction_data) = get_transactions(peer.clone(), range.clone(), p2p).await + let Some(SourcePeer { + peer_id, + data: transactions, + }) = get_transactions(range.clone(), peer.clone(), p2p).await else { - return Batch::new(Some(peer), range, vec![]) + return Batch::new(peer, range, vec![]) }; - let iter = headers.into_iter().zip(transaction_data.into_iter()); + let iter = headers.into_iter().zip(transactions.into_iter()); let mut blocks = vec![]; for (block_header, transactions) in iter { let SealedBlockHeader { @@ -629,13 +701,13 @@ where } else { report_peer( p2p, - Some(peer.clone()), + Some(peer_id.clone()), PeerReportReason::InvalidTransactions, ); break } } - Batch::new(Some(peer), range, blocks) + Batch::new(Some(peer_id), range, blocks) } #[tracing::instrument( diff --git a/crates/services/sync/src/import/back_pressure_tests.rs b/crates/services/sync/src/import/back_pressure_tests.rs index 9610d552dba..c415032ea7a 100644 --- a/crates/services/sync/src/import/back_pressure_tests.rs +++ b/crates/services/sync/src/import/back_pressure_tests.rs @@ -106,13 +106,14 @@ async fn test_back_pressure(input: Input, state: State, params: Config) -> Count let consensus = Arc::new(PressureConsensus::new(counts.clone(), input.consensus)); let notify = Arc::new(Notify::new()); - let import = Import { + let mut import = Import { state, notify, params, p2p, executor, consensus, + cache: Cache::new(), }; import.notify.notify_one(); diff --git a/crates/services/sync/src/import/cache.rs b/crates/services/sync/src/import/cache.rs new file mode 100644 index 00000000000..031ee0bfcf8 --- /dev/null +++ b/crates/services/sync/src/import/cache.rs @@ -0,0 +1,502 @@ +use std::{ + collections::BTreeMap, + num::NonZeroU32, + ops::{ + Range, + RangeInclusive, + }, +}; + +use fuel_core_services::SharedMutex; +use fuel_core_types::blockchain::{ + SealedBlock, + SealedBlockHeader, +}; + +use super::Batch; + +/// The cache that stores the fetched headers and blocks. +#[derive(Clone, Debug)] +pub struct Cache(SharedMutex>); + +#[derive(Debug, Clone)] +#[allow(clippy::large_enum_variant)] +pub enum CachedData { + Header(SealedBlockHeader), + Block(SealedBlock), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CachedDataBatch { + Headers(Batch), + Blocks(Batch), + None(Range), +} + +impl CachedDataBatch { + pub fn is_range_empty(&self) -> bool { + match self { + CachedDataBatch::None(range) => range.is_empty(), + CachedDataBatch::Blocks(batch) => batch.range.is_empty(), + CachedDataBatch::Headers(batch) => batch.range.is_empty(), + } + } +} + +impl Cache { + pub fn new() -> Self { + Self(SharedMutex::new(BTreeMap::new())) + } + + pub fn insert_blocks(&mut self, batch: Batch) { + let mut lock = self.0.lock(); + for block in batch.results { + lock.insert(**block.entity.header().height(), CachedData::Block(block)); + } + } + + pub fn insert_headers(&mut self, batch: Batch) { + let mut lock = self.0.lock(); + for header in batch.results { + lock.insert(**header.entity.height(), CachedData::Header(header)); + } + } + + pub fn get_chunks( + &self, + range: RangeInclusive, + max_chunk_size: NonZeroU32, + ) -> futures::stream::Iter> { + let end = (*range.end()).saturating_add(1); + + let cache_iter = self.collect_cache_data(range.clone()); + let mut current_height = *range.start(); + let mut chunks = Vec::new(); + let mut current_chunk = CachedDataBatch::None(0..0); + + for (height, data) in cache_iter { + // We have a range missing in our cache. + // Push the existing chunk and push chunks of missing data + if height != current_height { + if !current_chunk.is_range_empty() { + chunks.push(current_chunk); + } + current_chunk = CachedDataBatch::None(0..0); + Self::push_missing_chunks( + &mut chunks, + current_height, + height, + max_chunk_size, + end, + ); + } + // Either accumulate in the same chunk or transition + // if the data is not the same as the current chunk + current_chunk = Self::handle_current_chunk( + current_chunk, + data, + height, + &mut chunks, + max_chunk_size, + ); + current_height = height.saturating_add(1); + } + + if !current_chunk.is_range_empty() { + chunks.push(current_chunk); + } + + Self::push_missing_chunks(&mut chunks, current_height, end, max_chunk_size, end); + futures::stream::iter(chunks) + } + + fn collect_cache_data(&self, range: RangeInclusive) -> Vec<(u32, CachedData)> { + let lock = self.0.lock(); + lock.range(range).map(|(k, v)| (*k, v.clone())).collect() + } + + fn handle_current_chunk( + current_chunk: CachedDataBatch, + data: CachedData, + height: u32, + chunks: &mut Vec, + max_chunk_size: NonZeroU32, + ) -> CachedDataBatch { + let max_chunk_size = max_chunk_size.get() as usize; + match (current_chunk, data) { + (CachedDataBatch::None(_), CachedData::Header(data)) => { + CachedDataBatch::Headers(Batch::new( + None, + height..height.saturating_add(1), + vec![data], + )) + } + (CachedDataBatch::None(_), CachedData::Block(data)) => { + CachedDataBatch::Blocks(Batch::new( + None, + height..height.saturating_add(1), + vec![data], + )) + } + (CachedDataBatch::Headers(mut batch), CachedData::Header(data)) => { + tracing::warn!("Header data range in cache is not continuous."); + debug_assert_eq!(batch.range.end, height); + debug_assert!(batch.range.len() <= max_chunk_size); + + if batch.range.len() == max_chunk_size { + chunks.push(CachedDataBatch::Headers(batch)); + + CachedDataBatch::Headers(Batch::new( + None, + height..height.saturating_add(1), + vec![data], + )) + } else { + batch.range = batch.range.start..batch.range.end.saturating_add(1); + batch.results.push(data); + CachedDataBatch::Headers(batch) + } + } + (CachedDataBatch::Blocks(mut batch), CachedData::Block(data)) => { + tracing::warn!("Block data range in cache is not continuous."); + debug_assert_eq!(batch.range.end, height); + debug_assert!(batch.range.len() <= max_chunk_size); + + if batch.range.len() == max_chunk_size { + chunks.push(CachedDataBatch::Blocks(batch)); + + CachedDataBatch::Blocks(Batch::new( + None, + height..height.saturating_add(1), + vec![data], + )) + } else { + batch.range = batch.range.start..batch.range.end.saturating_add(1); + batch.results.push(data); + CachedDataBatch::Blocks(batch) + } + } + (CachedDataBatch::Headers(headers_batch), CachedData::Block(block)) => { + debug_assert_eq!(headers_batch.range.end, height); + chunks.push(CachedDataBatch::Headers(headers_batch)); + CachedDataBatch::Blocks(Batch::new( + None, + height..height.saturating_add(1), + vec![block], + )) + } + (CachedDataBatch::Blocks(blocks_batch), CachedData::Header(header)) => { + debug_assert_eq!(blocks_batch.range.end, height); + chunks.push(CachedDataBatch::Blocks(blocks_batch)); + CachedDataBatch::Headers(Batch::new( + None, + height..height.saturating_add(1), + vec![header], + )) + } + } + } + + fn push_missing_chunks( + chunks: &mut Vec, + current_height: u32, + height: u32, + max_chunk_size: NonZeroU32, + end: u32, + ) { + let chunk_size = max_chunk_size.get(); + let missing_chunks = (current_height..height).step_by(chunk_size as usize).map( + move |chunk_start| { + let block_end = chunk_start.saturating_add(chunk_size).min(end); + CachedDataBatch::None(chunk_start..block_end) + }, + ); + chunks.extend(missing_chunks); + } + + pub fn remove_element(&mut self, height: &u32) { + let mut lock = self.0.lock(); + lock.remove(height); + } +} + +#[cfg(test)] +mod tests { + use crate::import::{ + cache::{ + Cache, + CachedDataBatch, + }, + Batch, + }; + use fuel_core_types::{ + blockchain::{ + block::Block, + consensus::Sealed, + header::BlockHeader, + }, + fuel_tx::Bytes32, + tai64::Tai64, + }; + use futures::StreamExt; + use std::{ + num::NonZeroU32, + ops::RangeInclusive, + }; + use test_case::test_case; + + fn create_header(height: u32) -> Sealed { + Sealed { + entity: BlockHeader::new_block(height.into(), Tai64::from_unix(0)), + consensus: Default::default(), + } + } + fn create_block(height: u32) -> Sealed { + Sealed { + entity: Block::new( + (&create_header(height).entity).into(), + Vec::new(), + &[], + Bytes32::default(), + ) + .unwrap(), + consensus: Default::default(), + } + } + + #[test_case(&[], &[], 3, 0..=10 => vec![ + CachedDataBatch::None(0..3), + CachedDataBatch::None(3..6), + CachedDataBatch::None(6..9), + CachedDataBatch::None(9..11), + ] ; "multiple empty chunks")] + #[test_case(&[ + create_header(0) + ], &[], 3, 0..=10 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..1, vec![create_header(0)])), + CachedDataBatch::None(1..4), + CachedDataBatch::None(4..7), + CachedDataBatch::None(7..10), + CachedDataBatch::None(10..11), + ]; "one header and empty ranges")] + #[test_case(&[ + create_header(0), + create_header(1), + create_header(2) + ], &[], 3, 0..=10 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..3, vec![ + create_header(0), + create_header(1), + create_header(2) + ])), + CachedDataBatch::None(3..6), + CachedDataBatch::None(6..9), + CachedDataBatch::None(9..11), + ]; "multiple headers and empty ranges")] + #[test_case(&[], &[ + create_block(0) + ], 3, 0..=10 => vec![ + CachedDataBatch::Blocks(Batch::new(None, 0..1, vec![create_block(0)])), + CachedDataBatch::None(1..4), + CachedDataBatch::None(4..7), + CachedDataBatch::None(7..10), + CachedDataBatch::None(10..11), + ]; "one block and empty ranges")] + #[test_case(&[ + create_header(0) + ], &[ + create_block(1) + ], 3, 0..=10 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..1, vec![create_header(0)])), + CachedDataBatch::Blocks(Batch::new(None, 1..2, vec![create_block(1)])), + CachedDataBatch::None(2..5), + CachedDataBatch::None(5..8), + CachedDataBatch::None(8..11), + ]; "one header, one block and empty ranges")] + #[test_case(&[ + create_header(0), + create_header(1) + ], &[ + create_block(2), + create_block(3) + ], 2, 0..=10 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..2, vec![ + create_header(0), + create_header(1) + ])), + CachedDataBatch::Blocks(Batch::new(None, 2..4, vec![ + create_block(2), + create_block(3) + ])), + CachedDataBatch::None(4..6), + CachedDataBatch::None(6..8), + CachedDataBatch::None(8..10), + CachedDataBatch::None(10..11), + ]; "multiple headers, multiple blocks and empty ranges")] + #[test_case(&[ + create_header(0), + create_header(1), + create_header(2), + create_header(3) + ], &[ + create_block(4), + create_block(5), + create_block(6), + create_block(7) + ], 2, 0..=10 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..2, vec![ + create_header(0), + create_header(1) + ])), + CachedDataBatch::Headers(Batch::new(None, 2..4, vec![ + create_header(2), + create_header(3) + ])), + CachedDataBatch::Blocks(Batch::new(None, 4..6, vec![ + create_block(4), + create_block(5) + ])), + CachedDataBatch::Blocks(Batch::new(None, 6..8, vec![ + create_block(6), + create_block(7) + ])), + CachedDataBatch::None(8..10), + CachedDataBatch::None(10..11), + ]; "multiple headers, multiple blocks and empty ranges with smaller chunk size")] + #[test_case(&[ + create_header(0), + create_header(1), + create_header(2), + create_header(3) + ], &[ + create_block(4), + create_block(5), + create_block(6), + create_block(7) + ], 2, 0..=7 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..2, vec![ + create_header(0), + create_header(1) + ])), + CachedDataBatch::Headers(Batch::new(None, 2..4, vec![ + create_header(2), + create_header(3) + ])), + CachedDataBatch::Blocks(Batch::new(None, 4..6, vec![ + create_block(4), + create_block(5) + ])), + CachedDataBatch::Blocks(Batch::new(None, 6..8, vec![ + create_block(6), + create_block(7) + ])), + ]; "multiple headers, multiple blocks with no empty ranges")] + #[test_case(&[ + create_header(0), + create_header(1), + create_header(2) + ], &[ + create_block(3), + create_block(4), + create_block(5) + ], 3, 0..=5 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..3, vec![ + create_header(0), + create_header(1), + create_header(2) + ])), + CachedDataBatch::Blocks(Batch::new(None, 3..6, vec![ + create_block(3), + create_block(4), + create_block(5) + ])), + ]; "multiple headers, multiple blocks with no empty ranges and larger chunk size")] + #[test_case(&[ + create_header(0), + create_header(1) + ], &[ + create_block(2), + create_block(3) + ], 2, 0..=3 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..2, vec![ + create_header(0), + create_header(1) + ])), + CachedDataBatch::Blocks(Batch::new(None, 2..4, vec![ + create_block(2), + create_block(3) + ])), + ]; "multiple headers, multiple blocks with no empty ranges and exact chunk size")] + #[test_case(&[ + create_header(0), + create_header(1), + create_header(2) + ], &[ + create_block(3), + create_block(4), + create_block(5) + ], 1, 0..=5 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..1, vec![ + create_header(0) + ])), + CachedDataBatch::Headers(Batch::new(None, 1..2, vec![ + create_header(1) + ])), + CachedDataBatch::Headers(Batch::new(None, 2..3, vec![ + create_header(2) + ])), + CachedDataBatch::Blocks(Batch::new(None, 3..4, vec![ + create_block(3) + ])), + CachedDataBatch::Blocks(Batch::new(None, 4..5, vec![ + create_block(4) + ])), + CachedDataBatch::Blocks(Batch::new(None, 5..6, vec![ + create_block(5) + ])), + ]; "multiple headers, multiple blocks with max chunk size of 1")] + #[test_case(&[ + create_header(0) + ], &[ + create_block(1) + ], 1, 0..=1 => vec![ + CachedDataBatch::Headers(Batch::new(None, 0..1, vec![ + create_header(0) + ])), + CachedDataBatch::Blocks(Batch::new(None, 1..2, vec![ + create_block(1) + ])), + ]; "one header, one block with max chunk size of 1")] + #[test_case(&[], &[ + create_block(5) + ], 1, 4..=6 => vec![ + CachedDataBatch::None(4..5), + CachedDataBatch::Blocks(Batch::new(None, 5..6, vec![ + create_block(5) + ])), + CachedDataBatch::None(6..7), + ]; "one block in empty range sandwich with max chunk size of 1")] + #[tokio::test] + async fn test_get_batch_scenarios( + headers: &[Sealed], + blocks: &[Sealed], + max_chunk_size: u32, + asked_range: RangeInclusive, + ) -> Vec { + let mut cache = Cache::new(); + cache.insert_headers(Batch::new( + None, + 0..headers.len().try_into().unwrap(), + headers.to_vec(), + )); + cache.insert_blocks(Batch::new( + None, + 0..blocks.len().try_into().unwrap(), + blocks.to_vec(), + )); + cache + .get_chunks(asked_range, NonZeroU32::try_from(max_chunk_size).unwrap()) + .collect() + .await + } +} diff --git a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs index d01998e13d1..3d74cf73ffa 100644 --- a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs +++ b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs @@ -47,7 +47,7 @@ impl PeerToPeerPort for PressurePeerToPeer { self.p2p.get_sealed_block_headers(block_height_range).await } - async fn get_transactions( + async fn get_transactions_from_peer( &self, block_ids: SourcePeer>, ) -> anyhow::Result>> { @@ -57,6 +57,19 @@ impl PeerToPeerPort for PressurePeerToPeer { self.counts.apply(|c| c.inc_blocks()); } self.counts.apply(|c| c.dec_transactions()); + self.p2p.get_transactions_from_peer(block_ids).await + } + + async fn get_transactions( + &self, + block_ids: Range, + ) -> anyhow::Result>>> { + self.counts.apply(|c| c.inc_transactions()); + tokio::time::sleep(self.durations[1]).await; + for _height in block_ids.clone() { + self.counts.apply(|c| c.inc_blocks()); + } + self.counts.apply(|c| c.dec_transactions()); self.p2p.get_transactions(block_ids).await } @@ -73,20 +86,25 @@ impl PressurePeerToPeer { pub fn new(counts: SharedCounts, delays: [Duration; 2]) -> Self { let mut mock = MockPeerToPeerPort::default(); mock.expect_get_sealed_block_headers().returning(|range| { - let peer = random_peer(); - let headers = range - .clone() - .map(BlockHeight::from) - .map(empty_header) - .collect(); - let headers = peer.bind(Some(headers)); - Ok(headers) - }); - mock.expect_get_transactions().returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let peer = random_peer(); + let headers = range + .clone() + .map(BlockHeight::from) + .map(empty_header) + .collect(); + let headers = peer.bind(Some(headers)); + Ok(headers) + }) }); + mock.expect_get_transactions_from_peer() + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); Self { p2p: mock, durations: delays, diff --git a/crates/services/sync/src/import/tests.rs b/crates/services/sync/src/import/tests.rs index 0db7d8c5532..baa4146a626 100644 --- a/crates/services/sync/src/import/tests.rs +++ b/crates/services/sync/src/import/tests.rs @@ -14,7 +14,11 @@ use crate::{ }, }; use fuel_core_types::services::p2p::Transactions; -use std::time::Duration; +use mockall::Sequence; +use std::{ + ops::Deref, + time::Duration, +}; use super::*; @@ -38,17 +42,21 @@ async fn test_import_0_to_5() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let params = Config { @@ -84,17 +92,21 @@ async fn test_import_3_to_5() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let params = Config { @@ -147,20 +159,24 @@ async fn test_import_0_to_499() { p2p.expect_get_sealed_block_headers() .times(times) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); // Happens once for each batch let times = div_ceil(n, header_batch_size); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(times) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let params = Config { @@ -196,17 +212,21 @@ async fn import__signature_fails_on_header_5_only() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let state = State::new(3, 5).into(); @@ -227,6 +247,309 @@ async fn import__signature_fails_on_header_5_only() { assert_eq!((State::new(4, None), false), res); } +#[tokio::test] +async fn import__keep_data_asked_in_fail_ask_header_cases() { + // Test is going from block 4 (3 already committed) to 6 + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 1, + }; + + let mut consensus_port = MockConsensusPort::default(); + // No reask on verification on all of the blocks + consensus_port + .expect_check_sealed_header() + .times(3) + .returning(|_| Ok(true)); + // No reask on da height on all of the blocks + consensus_port + .expect_await_da_height() + .times(3) + .returning(|_| Ok(())); + + let mut p2p = MockPeerToPeerPort::default(); + let mut seq = Sequence::new(); + // Given + // Fail to get headers for block 4 + p2p.expect_get_sealed_block_headers() + .times(1) + .in_sequence(&mut seq) + .returning(|range| { + assert_eq!(range, 4..5); + Box::pin(async move { + tokio::time::sleep(Duration::from_millis(300)).await; + Err(anyhow::anyhow!("Some network error")) + }) + }); + // Success for 5 and 6 that is in parallel with 4 + p2p.expect_get_sealed_block_headers() + .times(2) + .in_sequence(&mut seq) + .returning(|range| { + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) + }); + // Then + // Reask only for block 4 + p2p.expect_get_sealed_block_headers() + .times(1) + .in_sequence(&mut seq) + .returning(|range| { + assert_eq!(range, 4..5); + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) + }); + // No reask on getting full block step for 4, 5 and 6 blocks + p2p.expect_get_transactions_from_peer() + .times(3) + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); + p2p.expect_report_peer().returning(|_, _| Ok(())); + + let p2p = Arc::new(p2p); + let executor: Arc = Arc::new(DefaultMocks::times([3])); + let consensus = Arc::new(consensus_port); + let notify = Arc::new(Notify::new()); + let state: SharedMutex = State::new(3, 6).into(); + + let mut import = Import { + state: state.clone(), + notify: notify.clone(), + params, + p2p, + executor, + consensus, + cache: Cache::new(), + }; + let (_tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher = shutdown.into(); + notify.notify_one(); + let res = import.import(&mut watcher).await.is_ok(); + assert!(!res); + assert_eq!(&State::new(3, None), state.lock().deref()); + // Reset the state for a next call + *state.lock() = State::new(3, 6); + + // When + // Should re-ask to P2P only block 4. + let res = import.import(&mut watcher).await.is_ok(); + assert!(res); + assert_eq!(&State::new(6, None), state.lock().deref()); +} + +#[tokio::test] +async fn import__keep_data_asked_in_fail_ask_transactions_cases() { + // Test is going from block 4 (3 already committed) to 6 + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 1, + }; + + let mut consensus_port = MockConsensusPort::default(); + // No reask on verification on all of the blocks + consensus_port + .expect_check_sealed_header() + .times(3) + .returning(|_| Ok(true)); + // One reask on the da_height after reask of the transactions for block 4 + consensus_port + .expect_await_da_height() + .times(4) + .returning(|_| Ok(())); + + let mut p2p = MockPeerToPeerPort::default(); + // Everything goes well on the headers part for all blocks + p2p.expect_get_sealed_block_headers() + .times(3) + .returning(|range| { + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) + }); + let mut seq = Sequence::new(); + // Given + // Fail to get transactions for block 4 + p2p.expect_get_transactions_from_peer() + .times(1) + .in_sequence(&mut seq) + .returning(|range| { + assert_eq!(range.data, 4..5); + Box::pin(async move { + tokio::time::sleep(Duration::from_millis(300)).await; + Err(anyhow::anyhow!("Some network error")) + }) + }); + + // Success for 5 and 6 that is in parallel with 4 + p2p.expect_get_transactions_from_peer() + .times(2) + .in_sequence(&mut seq) + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); + // Then + // Reask only for block 4 + p2p.expect_get_transactions() + .times(1) + .in_sequence(&mut seq) + .returning(|block_ids| { + assert_eq!(block_ids, 4..5); + Box::pin(async move { + let data = block_ids; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(SourcePeer { + peer_id: random_peer(), + data: Some(v), + }) + }) + }); + + p2p.expect_report_peer().returning(|_, _| Ok(())); + + let p2p = Arc::new(p2p); + let executor: Arc = Arc::new(DefaultMocks::times([3])); + let consensus = Arc::new(consensus_port); + let notify = Arc::new(Notify::new()); + let state: SharedMutex = State::new(3, 6).into(); + + let mut import = Import { + state: state.clone(), + notify: notify.clone(), + params, + p2p, + executor, + consensus, + cache: Cache::new(), + }; + let (_tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher = shutdown.into(); + notify.notify_one(); + let res = import.import(&mut watcher).await.is_ok(); + assert!(!res); + assert_eq!(&State::new(3, None), state.lock().deref()); + // Reset the state for a next call + *state.lock() = State::new(3, 6); + // When + // Should re-ask to P2P only block 4. + let res = import.import(&mut watcher).await.is_ok(); + assert!(res); + assert_eq!(&State::new(6, None), state.lock().deref()); +} + +#[tokio::test] +async fn import__keep_data_asked_in_fail_execution() { + // Test is going from block 4 (3 already committed) to 6 + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 1, + }; + + let mut consensus_port = MockConsensusPort::default(); + // Data is re-ask for the block 4 because his execution failed + consensus_port + .expect_check_sealed_header() + .times(4) + .returning(|_| Ok(true)); + // Data is re-ask for the block 4 because his execution failed + consensus_port + .expect_await_da_height() + .times(4) + .returning(|_| Ok(())); + + let mut p2p = MockPeerToPeerPort::default(); + // Data is re-ask for the block 4 because his execution failed + p2p.expect_get_sealed_block_headers() + .times(4) + .returning(|range| { + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) + }); + + // Data is re-ask for the block 4 because his execution failed + p2p.expect_get_transactions_from_peer() + .times(4) + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); + p2p.expect_report_peer().returning(|_, _| Ok(())); + + let p2p = Arc::new(p2p); + + // Given + let mut executor: MockBlockImporterPort = MockBlockImporterPort::new(); + let mut seq = Sequence::new(); + // Fails execute on the 4 one + executor + .expect_execute_and_commit() + .times(1) + .in_sequence(&mut seq) + .returning(|block| { + assert_eq!(block.entity.header().height(), &BlockHeight::new(4)); + anyhow::bail!("Bad execution") + }); + // Success execute the 3 after + executor + .expect_execute_and_commit() + .times(3) + .in_sequence(&mut seq) + .returning(|_| Ok(())); + let executor = Arc::new(executor); + let consensus = Arc::new(consensus_port); + let notify = Arc::new(Notify::new()); + let state: SharedMutex = State::new(3, 6).into(); + + let mut import = Import { + state: state.clone(), + notify: notify.clone(), + params, + p2p, + executor, + consensus, + cache: Cache::new(), + }; + let (_tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher = shutdown.into(); + notify.notify_one(); + let res = import.import(&mut watcher).await.is_ok(); + assert!(!res); + assert_eq!(&State::new(3, None), state.lock().deref()); + // Reset the state for a next call + *state.lock() = State::new(3, 6); + // When + // Should re-ask to P2P only block 4. + let res = import.import(&mut watcher).await.is_ok(); + assert!(res); + assert_eq!(&State::new(6, None), state.lock().deref()); +} + #[tokio::test] async fn import__signature_fails_on_header_4_only() { // given @@ -244,17 +567,21 @@ async fn import__signature_fails_on_header_4_only() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(0) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let state = State::new(3, 5).into(); @@ -282,10 +609,12 @@ async fn import__header_not_found() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|_| { - let peer = random_peer(); - let headers = Some(Vec::new()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(Vec::new()); + let headers = peer.bind(headers); + Ok(headers) + }) }); let state = State::new(3, 5).into(); @@ -313,10 +642,12 @@ async fn import__header_response_incomplete() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|_| { - let peer = random_peer(); - let headers = None; - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = None; + let headers = peer.bind(headers); + Ok(headers) + }) }); let state = State::new(3, 5).into(); @@ -344,18 +675,22 @@ async fn import__header_5_not_found() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|_| { - let peer = random_peer(); - let headers = Some(vec![empty_header(4)]); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(vec![empty_header(4)]); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let state = State::new(3, 5).into(); @@ -383,12 +718,14 @@ async fn import__header_4_not_found() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|_| { - let peer = random_peer(); - let headers = Some(vec![empty_header(5)]); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(vec![empty_header(5)]); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions().times(0); + p2p.expect_get_transactions_from_peer().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -425,14 +762,16 @@ async fn import__transactions_not_found() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) - .returning(|_| Ok(None)); + .returning(|_| Box::pin(async move { Ok(None) })); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -469,23 +808,27 @@ async fn import__transactions_not_found_for_header_4() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); let mut height = 3; - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(move |block_ids| { - height += 1; - if height == 4 { - Ok(None) - } else { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) - } + Box::pin(async move { + height += 1; + if height == 4 { + Ok(None) + } else { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + } + }) }); let state = State::new(3, 5).into(); @@ -523,15 +866,21 @@ async fn import__transactions_not_found_for_header_5() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) + }); + p2p.expect_get_transactions_from_peer() + .times(1) + .returning(move |_| { + Box::pin(async move { + let v = vec![Transactions::default()]; + Ok(Some(v)) + }) }); - p2p.expect_get_transactions().times(1).returning(move |_| { - let v = vec![Transactions::default()]; - Ok(Some(v)) - }); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -557,8 +906,10 @@ async fn import__p2p_error() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Err(anyhow::anyhow!("Some network error"))); - p2p.expect_get_transactions().times(0); + .returning(|_| { + Box::pin(async move { Err(anyhow::anyhow!("Some network error")) }) + }); + p2p.expect_get_transactions_from_peer().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -595,14 +946,18 @@ async fn import__p2p_error_on_4_transactions() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) - .returning(|_| Err(anyhow::anyhow!("Some network error"))); + .returning(|_| { + Box::pin(async move { Err(anyhow::anyhow!("Some network error")) }) + }); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -645,12 +1000,14 @@ async fn import__consensus_error_on_4() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions().times(0); + p2p.expect_get_transactions_from_peer().times(0); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -693,17 +1050,21 @@ async fn import__consensus_error_on_5() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let state = State::new(3, 5).into(); @@ -741,17 +1102,21 @@ async fn import__execution_error_on_header_4() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let mut executor = MockBlockImporterPort::default(); @@ -801,17 +1166,21 @@ async fn import__execution_error_on_header_5() { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(1) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let mut executor = MockBlockImporterPort::default(); @@ -892,18 +1261,23 @@ async fn import__can_work_in_two_loops() { p2p.expect_get_sealed_block_headers() .times(2) .returning(move |range| { - state.apply(|s| s.observe(6)); - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + let state = state.clone(); + Box::pin(async move { + state.apply(|s| s.observe(6)); + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(2) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); let c = DefaultMocks::times([2]); @@ -942,13 +1316,14 @@ async fn test_import_inner( let executor = Arc::new(executor); let consensus = Arc::new(consensus_port); - let import = Import { + let mut import = Import { state, notify, params, p2p, executor, consensus, + cache: Cache::new(), }; let (_tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); let mut watcher = shutdown.into(); @@ -1040,16 +1415,21 @@ async fn import__execution_error_on_header_4_when_awaits_for_1000000_blocks() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers().returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) - }); - p2p.expect_get_transactions().returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); + p2p.expect_get_transactions_from_peer() + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); let mut executor = MockBlockImporterPort::default(); executor @@ -1163,13 +1543,14 @@ impl PeerReportTestBuilder { header_batch_size: 10, }; - let import = Import { + let mut import = Import { state, notify, params, p2p, executor, consensus, + cache: Cache::new(), }; let (_tx, shutdown) = tokio::sync::watch::channel(fuel_core_services::State::Started); @@ -1189,29 +1570,40 @@ impl PeerReportTestBuilder { if let Some(get_headers) = self.get_sealed_headers.clone() { p2p.expect_get_sealed_block_headers().returning(move |_| { let peer: PeerId = peer_id.clone().into(); - let headers = peer.bind(get_headers.clone()); - Ok(headers) + let get_headers = get_headers.clone(); + Box::pin(async move { + let headers = peer.bind(get_headers); + Ok(headers) + }) }); } else { p2p.expect_get_sealed_block_headers() .returning(move |range| { let peer: PeerId = peer_id.clone().into(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); } let transactions = self.get_transactions.clone(); if let Some(t) = transactions { - p2p.expect_get_transactions() - .returning(move |_| Ok(t.clone())); - } else { - p2p.expect_get_transactions().returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + p2p.expect_get_transactions_from_peer().returning(move |_| { + let t = t.clone(); + Box::pin(async move { Ok(t) }) }); + } else { + p2p.expect_get_transactions_from_peer() + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = + data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); } let mut seq = mockall::Sequence::new(); @@ -1326,18 +1718,22 @@ impl DefaultMocks for MockPeerToPeerPort { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect()); - let headers = peer.bind(headers); - Ok(headers) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect()); + let headers = peer.bind(headers); + Ok(headers) + }) }); - p2p.expect_get_transactions() + p2p.expect_get_transactions_from_peer() .times(t.next().unwrap()) .returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) }); p2p } diff --git a/crates/services/sync/src/ports.rs b/crates/services/sync/src/ports.rs index 86f8280489d..483b4f10156 100644 --- a/crates/services/sync/src/ports.rs +++ b/crates/services/sync/src/ports.rs @@ -34,8 +34,8 @@ pub enum PeerReportReason { InvalidTransactions, } -#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] #[async_trait::async_trait] +#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] /// Port for communication with the network. pub trait PeerToPeerPort { /// Stream of newly observed block heights. @@ -47,9 +47,15 @@ pub trait PeerToPeerPort { block_height_range: Range, ) -> anyhow::Result>>>; - /// Request transactions from the network for the given block - /// and source peer. + /// Request transactions from the network for the given block range async fn get_transactions( + &self, + block_ids: Range, + ) -> anyhow::Result>>>; + + /// Request transactions from the network for the given block range + /// and source peer. + async fn get_transactions_from_peer( &self, block_ids: SourcePeer>, ) -> anyhow::Result>>; diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs index 57440afae24..19748c4b9aa 100644 --- a/crates/services/sync/src/service/tests.rs +++ b/crates/services/sync/src/service/tests.rs @@ -39,16 +39,21 @@ async fn test_new_service() { .into_boxed() }); p2p.expect_get_sealed_block_headers().returning(|range| { - let peer = random_peer(); - let headers = Some(range.map(empty_header).collect::>()); - let headers = peer.bind(headers); - Ok(headers) - }); - p2p.expect_get_transactions().returning(|block_ids| { - let data = block_ids.data; - let v = data.into_iter().map(|_| Transactions::default()).collect(); - Ok(Some(v)) + Box::pin(async move { + let peer = random_peer(); + let headers = Some(range.map(empty_header).collect::>()); + let headers = peer.bind(headers); + Ok(headers) + }) }); + p2p.expect_get_transactions_from_peer() + .returning(|block_ids| { + Box::pin(async move { + let data = block_ids.data; + let v = data.into_iter().map(|_| Transactions::default()).collect(); + Ok(Some(v)) + }) + }); let mut importer = MockBlockImporterPort::default(); importer .expect_committed_height_stream()