From 2dd5d2c962f9a87f5be489cc16899196ec17e53d Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Thu, 30 May 2024 15:32:52 -0400 Subject: [PATCH 1/9] Rate limit the proactive fetching task We now have rate limiting at the low-level fetcher, which is an excellent failsafe. However, by the time we get to that point, we have already consumed resources: memory for the list of waiters on the semaphore, memory for the task doing the fetch, memory for the fetch request itself, and memory for the list of callbacks on each item which is fetched for more than one purpose. In most cases this is fine, because we just don't spawn that many active fetches at once. However, for proactive fetching, we may request thousands of active fetches at the same time, and even though we are now limiting the network traffic they generate, we are not limiting the amount of memory they consume. This change adds a simple `await` instead of dropping/detaching the active fetch futures so that the proactive fetching task cannot proceed faster than the fetches can be completed. Note that we still get parallelism up to the chunk size. --- src/data_source/fetching.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 4e45a8388..61933f307 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -1247,7 +1247,14 @@ where chunk_size, start..block_height, ); - while blocks.next().await.is_some() {} + while let Some(fut) = blocks.next().await { + // Wait for the block to be fetched. This slows down the scanner so that we + // don't waste memory generating more active fetch tasks then we can handle at a + // given time. Note that even with this await, all blocks within a chunk are + // fetched in paralle, so this does not block the next block in the chunk, only + // the next chunk until the current chunk completes. + fut.await; + } // We have to trigger a separate fetch of the VID data, since this is fetched // independently of the block payload. let mut vid = self @@ -1256,7 +1263,11 @@ where chunk_size, start..block_height, ); - while vid.next().await.is_some() {} + while let Some(fut) = vid.next().await { + // As above, limit the speed at which we spawn new fetches to the speed at which + // we can process them. + fut.await; + } tracing::info!("completed proactive scan, will scan again in {minor_interval:?}"); sleep(minor_interval).await; From a719fb942366d1b4b65ef0b8a777732289a781a3 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Thu, 30 May 2024 15:40:31 -0400 Subject: [PATCH 2/9] REVERT BEFORE MERGING downgrade tide-disco for compatibility with sequencer hotfix branch --- Cargo.lock | 92 ++++++------------------------------------------------ Cargo.toml | 4 +-- 2 files changed, 11 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb70dbf5d..824ca443e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3159,7 +3159,7 @@ dependencies = [ "serde", "sha2 0.10.8", "snafu 0.8.2", - "surf-disco 0.6.0", + "surf-disco", "time 0.3.36", "tokio", "tracing", @@ -3180,7 +3180,7 @@ dependencies = [ "serde", "snafu 0.8.2", "tagged-base64", - "tide-disco 0.6.0", + "tide-disco", "toml", "vbs", ] @@ -3247,9 +3247,9 @@ dependencies = [ "serde", "serde-inline-default", "serde_json", - "surf-disco 0.6.0", + "surf-disco", "thiserror", - "tide-disco 0.6.0", + "tide-disco", "tokio", "toml", "tracing", @@ -3299,10 +3299,10 @@ dependencies = [ "serde_json", "snafu 0.8.2", "spin_sleep", - "surf-disco 0.7.0", + "surf-disco", "tagged-base64", "tempfile", - "tide-disco 0.7.0", + "tide-disco", "time 0.3.36", "tokio", "tokio-postgres", @@ -3353,7 +3353,7 @@ dependencies = [ "serde", "sha2 0.10.8", "snafu 0.8.2", - "surf-disco 0.6.0", + "surf-disco", "tagged-base64", "time 0.3.36", "tokio", @@ -3395,7 +3395,7 @@ dependencies = [ "sha3", "snafu 0.8.2", "tagged-base64", - "tide-disco 0.6.0", + "tide-disco", "tokio", "tracing", "vbs", @@ -7745,26 +7745,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "tide-disco 0.6.0", - "tracing", - "vbs", -] - -[[package]] -name = "surf-disco" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e4318abbbbc4d640c6cb45d29cd3c70cf517c751fae9b68f2f76a6ff1950da7" -dependencies = [ - "async-std", - "async-tungstenite", - "derivative", - "futures", - "hex", - "reqwest", - "serde", - "serde_json", - "tide-disco 0.7.0", + "tide-disco", "tracing", "vbs", ] @@ -8076,61 +8057,6 @@ dependencies = [ "vbs", ] -[[package]] -name = "tide-disco" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c9d803bc734a5cced767b36bb1caab87e9eb7eb144cffcab38160b084a8cef" -dependencies = [ - "anyhow", - "async-h1", - "async-lock 3.3.0", - "async-std", - "async-trait", - "clap", - "config", - "derivative", - "derive_more", - "dirs", - "edit-distance", - "futures", - "futures-util", - "http 1.1.0", - "include_dir", - "itertools 0.12.1", - "lazy_static", - "libc", - "markdown", - "maud", - "num-derive", - "num-traits", - "parking_lot", - "prometheus", - "reqwest", - "routefinder", - "semver 1.0.21", - "serde", - "serde_json", - "serde_with", - "shellexpand", - "signal-hook", - "signal-hook-async-std", - "snafu 0.8.2", - "strum", - "strum_macros", - "tagged-base64", - "tide", - "tide-websockets", - "toml", - "tracing", - "tracing-distributed", - "tracing-futures", - "tracing-log", - "tracing-subscriber 0.3.18", - "url", - "vbs", -] - [[package]] name = "tide-websockets" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 473eb9208..c426b27f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,9 +85,9 @@ prometheus = "0.13" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = "0.8" -surf-disco = "0.7" +surf-disco = "0.6" tagged-base64 = "0.4" -tide-disco = "0.7" +tide-disco = "0.6" time = "0.3" toml = "0.8" tracing = "0.1" From 061c155afb9c4e5b4b5470f6effed9c1cfaafe85 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Thu, 30 May 2024 15:44:33 -0400 Subject: [PATCH 3/9] Update comment --- src/data_source/fetching.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 61933f307..344fade8c 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -1232,15 +1232,11 @@ where }; prev_height = block_height; - // Iterate over all blocks that we should have. Merely iterating over the `Fetch`es - // without awaiting them is enough to trigger active fetches of missing blocks, - // since we always trigger an active fetch when fetching by block number. Moreover, - // fetching the block is enough to trigger an active fetch of the corresponding leaf - // if it too is missing. - // - // The chunking behavior of `get_range` automatically ensures that, no matter how - // big the range is, we will release the read lock on storage every `chunk_size` - // items, so we don't starve out would-be writers. + // Iterate over all blocks that we should have. Fetching the block is enough to + // trigger an active fetch of the corresponding leaf if it too is missing. The + // chunking behavior of `get_range` automatically ensures that, no matter how big + // the range is, we will release the read lock on storage every `chunk_size` items, + // so we don't starve out would-be writers. let mut blocks = self .clone() .get_range_with_chunk_size::<_, BlockQueryData>( From 57265a51a84614d00ffd7ced6e838205b40a42fd Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Thu, 30 May 2024 17:28:11 -0400 Subject: [PATCH 4/9] Add a delay after active fetches in proactive scanner. This can be used to limit the real time rate of catchup requests (as opposed to limiting the number of simultaneous requests, as we do with the semaphore in the fetcher). The default delay is 100ms, limiting active requests to 10/s. --- src/data_source/fetching.rs | 46 ++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 344fade8c..5cfa4910a 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -143,6 +143,7 @@ pub struct Builder { major_scan_interval: usize, major_scan_offset: usize, proactive_range_chunk_size: Option, + active_fetch_delay: Duration, proactive_fetching: bool, _types: PhantomData, } @@ -168,6 +169,7 @@ impl Builder { // don't all pause for a major scan together. major_scan_offset: 0, proactive_range_chunk_size: None, + active_fetch_delay: Duration::from_millis(100), proactive_fetching: true, _types: Default::default(), } @@ -240,6 +242,16 @@ impl Builder { self } + /// Add a delay between active fetches in proactive scans. + /// + /// This can be used to limit the rate at which this query service makes requests to other query + /// services during proactive scans. This is useful if the query service has a lot of blocks to + /// catch up on, as without a delay, scanning can be extremely burdensome on the peer. + pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self { + self.active_fetch_delay = active_fetch_delay; + self + } + /// Run without [proactive fetching](self#proactive-fetching). /// /// This can reduce load on the CPU and the database, but increases the probability that @@ -386,6 +398,7 @@ where let proactive_range_chunk_size = builder .proactive_range_chunk_size .unwrap_or(builder.range_chunk_size); + let active_fetch_delay = builder.active_fetch_delay; let fetcher = Arc::new(Fetcher::new(builder).await?); let scanner = if proactive_fetching { @@ -396,6 +409,7 @@ where major_interval, major_offset, proactive_range_chunk_size, + active_fetch_delay, ), )) } else { @@ -1196,6 +1210,7 @@ where major_interval: usize, major_offset: usize, chunk_size: usize, + active_fetch_delay: Duration, ) { let mut prev_height = 0; @@ -1243,13 +1258,19 @@ where chunk_size, start..block_height, ); - while let Some(fut) = blocks.next().await { - // Wait for the block to be fetched. This slows down the scanner so that we - // don't waste memory generating more active fetch tasks then we can handle at a - // given time. Note that even with this await, all blocks within a chunk are - // fetched in paralle, so this does not block the next block in the chunk, only - // the next chunk until the current chunk completes. - fut.await; + while let Some(fetch) = blocks.next().await { + if let Fetch::Pending(fut) = fetch { + // Wait for the block to be fetched. This slows down the scanner so that we + // don't waste memory generating more active fetch tasks then we can handle + // at a given time. Note that even with this await, all blocks within a + // chunk are fetched in paralle, so this does not block the next block in + // the chunk, only the next chunk until the current chunk completes. + fut.await; + + // Add a bit of artifical latency to easy pressure on the catchup provider, + // otherwise proactive scans can be highly intensive. + sleep(active_fetch_delay).await; + } } // We have to trigger a separate fetch of the VID data, since this is fetched // independently of the block payload. @@ -1259,10 +1280,13 @@ where chunk_size, start..block_height, ); - while let Some(fut) = vid.next().await { - // As above, limit the speed at which we spawn new fetches to the speed at which - // we can process them. - fut.await; + while let Some(fetch) = vid.next().await { + if let Fetch::Pending(fut) = fetch { + // As above, limit the speed at which we spawn new fetches to the speed at + // which we can process them. + fut.await; + sleep(active_fetch_delay).await; + } } tracing::info!("completed proactive scan, will scan again in {minor_interval:?}"); From 1cdd9b77a203b391871f1923264a1dd285771287 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sun, 2 Jun 2024 03:44:55 +0500 Subject: [PATCH 5/9] chunk_fetch_delay and move active_fetch_delay inside fetch function --- src/data_source/fetching.rs | 59 +++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 5cfa4910a..42d6a9a82 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -144,6 +144,7 @@ pub struct Builder { major_scan_offset: usize, proactive_range_chunk_size: Option, active_fetch_delay: Duration, + chunk_fetch_delay: Duration, proactive_fetching: bool, _types: PhantomData, } @@ -169,7 +170,8 @@ impl Builder { // don't all pause for a major scan together. major_scan_offset: 0, proactive_range_chunk_size: None, - active_fetch_delay: Duration::from_millis(100), + active_fetch_delay: Duration::from_millis(50), + chunk_fetch_delay: Duration::from_millis(100), proactive_fetching: true, _types: Default::default(), } @@ -252,6 +254,22 @@ impl Builder { self } + /// Adds a delay between chunk fetches during proactive scans. + /// + /// In a proactive scan, we retrieve a range of objects from a provider or local storage (e.g., a database). + /// Without a delay between fetching these chunks, the process can become very CPU-intensive, especially + /// when chunks are retrieved from local storage. While there is already a delay for active fetches + /// (`active_fetch_delay`), situations may arise when subscribed to an old stream that fetches most of the data + /// from local storage. + /// + /// This additional delay helps to limit constant maximum CPU usage + /// and ensures that local storage remains accessible to all processes, + /// not just the proactive scanner. + pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self { + self.chunk_fetch_delay = chunk_fetch_delay; + self + } + /// Run without [proactive fetching](self#proactive-fetching). /// /// This can reduce load on the CPU and the database, but increases the probability that @@ -398,7 +416,6 @@ where let proactive_range_chunk_size = builder .proactive_range_chunk_size .unwrap_or(builder.range_chunk_size); - let active_fetch_delay = builder.active_fetch_delay; let fetcher = Arc::new(Fetcher::new(builder).await?); let scanner = if proactive_fetching { @@ -409,7 +426,6 @@ where major_interval, major_offset, proactive_range_chunk_size, - active_fetch_delay, ), )) } else { @@ -840,6 +856,10 @@ where leaf_fetcher: Arc>, vid_common_fetcher: Arc>, range_chunk_size: usize, + // Duration to sleep after each active fetch, + active_fetch_delay: Duration, + // Duration to sleep after each chunk fetched + chunk_fetch_delay: Duration, } #[derive(Debug)] @@ -948,6 +968,8 @@ where leaf_fetcher: Arc::new(leaf_fetcher), vid_common_fetcher: Arc::new(vid_common_fetcher), range_chunk_size: builder.range_chunk_size, + active_fetch_delay: builder.active_fetch_delay, + chunk_fetch_delay: builder.chunk_fetch_delay, }) } } @@ -1015,9 +1037,14 @@ where R: RangeBounds + Send + 'static, T: RangedFetchable, { + let chunk_fetch_delay = self.chunk_fetch_delay; stream::iter(range_chunks(range, chunk_size)) .then(move |chunk| self.clone().get_chunk(chunk)) .flatten() + .then(move |obj| async move { + sleep(chunk_fetch_delay.clone()).await; + obj + }) .boxed() } @@ -1188,6 +1215,7 @@ where if req.might_exist(storage.height as usize, pruned_height) { T::active_fetch(self.clone(), storage, req).await; + sleep(self.active_fetch_delay).await; } else { tracing::debug!( "not fetching object {req:?} that cannot exist at height {}", @@ -1210,7 +1238,6 @@ where major_interval: usize, major_offset: usize, chunk_size: usize, - active_fetch_delay: Duration, ) { let mut prev_height = 0; @@ -1258,20 +1285,7 @@ where chunk_size, start..block_height, ); - while let Some(fetch) = blocks.next().await { - if let Fetch::Pending(fut) = fetch { - // Wait for the block to be fetched. This slows down the scanner so that we - // don't waste memory generating more active fetch tasks then we can handle - // at a given time. Note that even with this await, all blocks within a - // chunk are fetched in paralle, so this does not block the next block in - // the chunk, only the next chunk until the current chunk completes. - fut.await; - - // Add a bit of artifical latency to easy pressure on the catchup provider, - // otherwise proactive scans can be highly intensive. - sleep(active_fetch_delay).await; - } - } + while blocks.next().await.is_some() {} // We have to trigger a separate fetch of the VID data, since this is fetched // independently of the block payload. let mut vid = self @@ -1280,14 +1294,7 @@ where chunk_size, start..block_height, ); - while let Some(fetch) = vid.next().await { - if let Fetch::Pending(fut) = fetch { - // As above, limit the speed at which we spawn new fetches to the speed at - // which we can process them. - fut.await; - sleep(active_fetch_delay).await; - } - } + while vid.next().await.is_some() {} tracing::info!("completed proactive scan, will scan again in {minor_interval:?}"); sleep(minor_interval).await; From d91607b9f6699a6090cfc55a77b58d97effe3583 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sun, 2 Jun 2024 03:47:06 +0500 Subject: [PATCH 6/9] clippy --- src/data_source/fetching.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 42d6a9a82..f80d1d5d4 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -1042,7 +1042,7 @@ where .then(move |chunk| self.clone().get_chunk(chunk)) .flatten() .then(move |obj| async move { - sleep(chunk_fetch_delay.clone()).await; + sleep(chunk_fetch_delay).await; obj }) .boxed() From d9977b6099e6b29b79f44f2011644dd94602b015 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sun, 2 Jun 2024 03:55:40 +0500 Subject: [PATCH 7/9] add minor delay for test data source --- src/data_source/storage/no_storage.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/data_source/storage/no_storage.rs b/src/data_source/storage/no_storage.rs index fcfe1661d..53184a44a 100644 --- a/src/data_source/storage/no_storage.rs +++ b/src/data_source/storage/no_storage.rs @@ -250,7 +250,15 @@ pub mod testing { async fn connect(db: &Self::Storage) -> Self { match db { Storage::Sql(db) => { - Self::Sql(db.config().connect(Default::default()).await.unwrap()) + let cfg = db.config(); + let builder = cfg + .builder(Default::default()) + .await + .unwrap() + .with_active_fetch_delay(Duration::from_millis(1)) + .with_chunk_fetch_delay(Duration::from_millis(1)); + + Self::Sql(builder.build().await.unwrap()) } Storage::NoStorage { fetch_from_port } => { tracing::info!("creating NoStorage node, fetching missing data from port {fetch_from_port}"); @@ -272,6 +280,10 @@ pub mod testing { // don't have storage) and the test frequently goes back and looks up // old objects. .with_major_scan_interval(2) + // add minor delay for active fetch + .with_active_fetch_delay(Duration::from_millis(1)) + // add minor delay between chunks during proactive scan + .with_chunk_fetch_delay(Duration::from_millis(1)) .build() .await .unwrap(), From 0a96ec2ab0c733011ecec93d3fdcb3850e768381 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sun, 2 Jun 2024 07:00:29 +0500 Subject: [PATCH 8/9] sleep before flatten --- src/data_source/fetching.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index f80d1d5d4..8368913d8 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -1038,12 +1038,26 @@ where T: RangedFetchable, { let chunk_fetch_delay = self.chunk_fetch_delay; + let active_fetch_delay = self.active_fetch_delay; + stream::iter(range_chunks(range, chunk_size)) - .then(move |chunk| self.clone().get_chunk(chunk)) + .then(move |chunk| { + let self_clone = self.clone(); + async move { + { + let chunk = self_clone.get_chunk(chunk).await; + sleep(chunk_fetch_delay).await; + chunk + } + } + }) .flatten() - .then(move |obj| async move { - sleep(chunk_fetch_delay).await; - obj + .then(move |f| async move { + match f { + Fetch::Pending(_) => sleep(active_fetch_delay).await, + Fetch::Ready(_) => (), + }; + f }) .boxed() } @@ -1215,7 +1229,6 @@ where if req.might_exist(storage.height as usize, pruned_height) { T::active_fetch(self.clone(), storage, req).await; - sleep(self.active_fetch_delay).await; } else { tracing::debug!( "not fetching object {req:?} that cannot exist at height {}", From 2c501f072254cc57a04f3a1a6d570523c4434582 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Sun, 2 Jun 2024 07:59:19 +0500 Subject: [PATCH 9/9] add comment --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/data_source/fetching.rs | 6 ++++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 824ca443e..14ea67ac4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3258,7 +3258,7 @@ dependencies = [ [[package]] name = "hotshot-query-service" -version = "0.1.27" +version = "0.1.28" dependencies = [ "anyhow", "ark-serialize", diff --git a/Cargo.toml b/Cargo.toml index c426b27f4..44dbab70f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ [package] name = "hotshot-query-service" -version = "0.1.27" +version = "0.1.28" authors = ["Espresso Systems "] edition = "2021" license = "GPL-3.0-or-later" diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 8368913d8..1df87a7f9 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -1046,6 +1046,10 @@ where async move { { let chunk = self_clone.get_chunk(chunk).await; + + // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. + // This helps to limit constant high CPU usage when fetching long range of data, + // especially for older streams that fetch most of the data from local storage sleep(chunk_fetch_delay).await; chunk } @@ -1054,6 +1058,8 @@ where .flatten() .then(move |f| async move { match f { + // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on the catchup provider. + // The delay applies between pending fetches, not between chunks. Fetch::Pending(_) => sleep(active_fetch_delay).await, Fetch::Ready(_) => (), };