Skip to content

Commit

Permalink
Improve efficiency of aggregated statistics
Browse files Browse the repository at this point in the history
We have some statistics that are derived from the entire block
history, namely transaction count and payload size. Previously,
these were computed on the fly each request, which requires an
expensive full table scan and, in the case of payload size, a large
sum.

This change adds a new table `aggregate`, which stores these
cumulative values _at each block height_. This table is kept up to
date by a background task scanning the block stream. Now, looking up
the values of these statistics just requires reading a single row in
this table. This approach has numerous benefits in addition to a
massive performance improvement:
* we can now easily look up the values of these statistics at any
  historical block height, or for any historical _range_ of blocks
* instead of returning inaccurate counts when data is missing, we
  will simply return that we don't have the counts yet for the
  requested block height, since the aggregate table won't be
  populated for that row
* requests that explicitly specify a range or upper bound are easily
  cachable
  • Loading branch information
jbearer committed Nov 15, 2024
1 parent b41a630 commit 7c01c26
Show file tree
Hide file tree
Showing 19 changed files with 712 additions and 115 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- name: Format Check
run: cargo fmt -- --check

# Run Clippy on all targets. The lint workflow doesn't run Clippy on tests, because the tests
# Run Clippy on all targets. The lint workflow doesn't run Clippy on testgs, because the tests
# don't compile with all combinations of features.
- name: Clippy
run: cargo clippy --workspace --all-features --all-targets -- -D warnings
Expand All @@ -47,6 +47,9 @@ jobs:
run: |
cargo nextest run --workspace --release --all-features
timeout-minutes: 60

- name: Doc Test
run: cargo test --release --all-features --doc

- name: Generate Documentation
run: |
Expand Down
20 changes: 16 additions & 4 deletions api/node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,33 @@ Returns an integer.
"""

[route.count_transactions]
PATH = ["transactions/count"]
PATH = ["transactions/count", "transactions/count/:to", "transactions/count/:from/:to"]
":from" = "Integer"
":to" = "Integer"
DOC = """
Get the number of transactions in the chain.
Warning: count may be low if data is currently being indexed (see `sync-status`).
If `:from` or `:to` is specified, they restrict the range of blocks considered.
`transactions/count/:to` will return the number of transactions in all blocks up to and including
block number `:to`, while `transactions/count/:from/:to` will count the transactions in all blocks
between `:from` (inclusive) and `:to` (inclusive).
Returns an integer.
"""

[route.payload_size]
PATH = ["payloads/total-size"]
PATH = ["payloads/size", "payloads/size/:to", "payloads/size/:from/:to", "payloads/total-size"]
":from" = "Integer"
":to" = "Integer"
DOC = """
Get the size (in bytes) of all payload data in the chain.
Warning: size may be low if data is currently being indexed (see `sync-status`).
If `:from` or `:to` is specified, they restrict the range of blocks considered. `payloads/size/:to`
will return the cumulative size of all payloads in blocks up to and including block number `:to`,
while `payloads/size/:from/:to` will return the cumulative size in all blocks between `:from`
(inclusive) and `:to` (inclusive).
`payloads/total-size` is a deprecated alias for `payloads/size`.
Returns an integer.
"""
Expand Down
5 changes: 5 additions & 0 deletions migrations/V200__create_aggregates_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE aggregate (
height BIGINT PRIMARY KEY REFERENCES header (height) ON DELETE CASCADE,
num_transactions BIGINT NOT NULL,
payload_size BIGINT NOT NULL
);
40 changes: 32 additions & 8 deletions src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,11 +680,11 @@ pub mod node_tests {
storage::{NodeStorage, UpdateAvailabilityStorage},
update::Transaction,
},
node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart},
node::{BlockId, NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
testing::{
consensus::{MockNetwork, TestableDataSource},
mocks::{mock_transaction, MockPayload, MockTypes},
setup_test,
setup_test, sleep,
},
types::HeightIndexed,
Header, VidShare,
Expand All @@ -704,6 +704,7 @@ pub mod node_tests {
vid::{vid_scheme, VidSchemeType},
};
use jf_vid::VidScheme;
use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
pub async fn test_sync_status<D: TestableDataSource>()
Expand Down Expand Up @@ -867,11 +868,10 @@ pub mod node_tests {
assert_eq!(ds.count_transactions().await.unwrap(), 0);
assert_eq!(ds.payload_size().await.unwrap(), 0);

// Insert some transactions. We insert the blocks out of order to check that the counters
// account for missing blocks fetched later.
// Insert some transactions.
let mut total_transactions = 0;
let mut total_size = 0;
for i in [0, 2, 1] {
'outer: for i in [0, 1, 2] {
// Using `i % 2` as the transaction data ensures we insert a duplicate transaction
// (since we insert more than 2 transactions total). The query service should still
// count these as separate transactions and should include both duplicates when
Expand Down Expand Up @@ -907,15 +907,39 @@ pub mod node_tests {
.await;
*leaf.leaf.block_header_mut() = header.clone();
let block = BlockQueryData::new(header, payload);
ds.append(BlockInfo::new(leaf, Some(block), None, None))
ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
.await
.unwrap();
assert_eq!(
NodeDataSource::<MockTypes>::block_height(&ds)
.await
.unwrap(),
(i + 1) as usize,
);

total_transactions += 1;
total_size += encoded.len();

assert_eq!(ds.count_transactions().await.unwrap(), total_transactions);
assert_eq!(ds.payload_size().await.unwrap(), total_size);
// Allow some time for the aggregator to update.
for retry in 0..5 {
let ds_transactions = ds.count_transactions().await.unwrap();
let ds_payload_size = ds.payload_size().await.unwrap();
if ds_transactions != total_transactions || ds_payload_size != total_size {
tracing::info!(
i,
retry,
total_transactions,
ds_transactions,
total_size,
ds_payload_size,
"waiting for statistics to update"
);
sleep(Duration::from_secs(1)).await;
} else {
continue 'outer;
}
}
panic!("counters did not update in time");
}
}

Expand Down
14 changes: 10 additions & 4 deletions src/data_source/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,17 @@ where
async fn block_height(&self) -> QueryResult<usize> {
self.data_source.block_height().await
}
async fn count_transactions(&self) -> QueryResult<usize> {
self.data_source.count_transactions().await
async fn count_transactions_in_range(
&self,
range: impl RangeBounds<usize> + Send,
) -> QueryResult<usize> {
self.data_source.count_transactions_in_range(range).await
}
async fn payload_size(&self) -> QueryResult<usize> {
self.data_source.payload_size().await
async fn payload_size_in_range(
&self,
range: impl RangeBounds<usize> + Send,
) -> QueryResult<usize> {
self.data_source.payload_size_in_range(range).await
}
async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
where
Expand Down
132 changes: 122 additions & 10 deletions src/data_source/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ use super::{
notifier::Notifier,
storage::{
pruning::{PruneStorage, PrunedHeightStorage},
AvailabilityStorage, ExplorerStorage, MerklizedStateHeightStorage, MerklizedStateStorage,
NodeStorage, UpdateAvailabilityStorage,
AggregatesStorage, AvailabilityStorage, ExplorerStorage, MerklizedStateHeightStorage,
MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage, UpdateAvailabilityStorage,
},
Transaction, VersionedDataSource,
};
Expand Down Expand Up @@ -153,6 +153,7 @@ pub struct Builder<Types, S, P> {
active_fetch_delay: Duration,
chunk_fetch_delay: Duration,
proactive_fetching: bool,
aggregator: bool,
_types: PhantomData<Types>,
}

Expand Down Expand Up @@ -187,6 +188,7 @@ impl<Types, S, P> Builder<Types, S, P> {
active_fetch_delay: Duration::from_millis(50),
chunk_fetch_delay: Duration::from_millis(100),
proactive_fetching: true,
aggregator: true,
_types: Default::default(),
}
}
Expand Down Expand Up @@ -320,6 +322,15 @@ impl<Types, S, P> Builder<Types, S, P> {
self.proactive_fetching = false;
self
}

/// Run without an aggregator.
///
/// This can reduce load on the CPU and the database, but it will cause aggregate statistics
/// (such as transaction counts) not to update.
pub fn disable_aggregator(mut self) -> Self {
self.aggregator = false;
self
}
}

impl<Types, S, P> Builder<Types, S, P>
Expand All @@ -328,8 +339,9 @@ where
Payload<Types>: QueryablePayload<Types>,
Header<Types>: QueryableHeader<Types>,
S: PruneStorage + VersionedDataSource + HasMetrics + 'static,
for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + PrunedHeightStorage + NodeStorage<Types>,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
for<'a> S::ReadOnly<'a>:
AvailabilityStorage<Types> + PrunedHeightStorage + NodeStorage<Types> + AggregatesStorage,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
P: AvailabilityProvider<Types>,
{
/// Build a [`FetchingDataSource`] with these options.
Expand Down Expand Up @@ -367,6 +379,8 @@ where
fetcher: Arc<Fetcher<Types, S, P>>,
// The proactive scanner task. This is only saved here so that we can cancel it on drop.
scanner: Option<BackgroundTask>,
// The aggregator task, which derives aggregate statistics from a block stream.
aggregator: Option<BackgroundTask>,
pruner: Pruner<Types, S, P>,
}

Expand Down Expand Up @@ -419,8 +433,9 @@ where
Payload<Types>: QueryablePayload<Types>,
Header<Types>: QueryableHeader<Types>,
S: VersionedDataSource + PruneStorage + HasMetrics + 'static,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
for<'a> S::ReadOnly<'a>:
AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage + AggregatesStorage,
P: AvailabilityProvider<Types>,
{
/// Build a [`FetchingDataSource`] with the given `storage` and `provider`.
Expand All @@ -429,6 +444,7 @@ where
}

async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
let aggregator = builder.aggregator;
let proactive_fetching = builder.proactive_fetching;
let minor_interval = builder.minor_scan_interval;
let major_interval = builder.major_scan_interval;
Expand All @@ -437,6 +453,7 @@ where
.proactive_range_chunk_size
.unwrap_or(builder.range_chunk_size);
let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());

let fetcher = Arc::new(Fetcher::new(builder).await?);
let scanner = if proactive_fetching {
Expand All @@ -454,11 +471,21 @@ where
None
};

let aggregator = if aggregator {
Some(BackgroundTask::spawn(
"aggregator",
fetcher.clone().aggregate(aggregator_metrics),
))
} else {
None
};

let pruner = Pruner::new(fetcher.clone()).await;
let ds = Self {
fetcher,
scanner,
pruner,
aggregator,
};

Ok(ds)
Expand Down Expand Up @@ -1166,6 +1193,70 @@ where
}
}

impl<Types, S, P> Fetcher<Types, S, P>
where
Types: NodeType,
Payload<Types>: QueryablePayload<Types>,
S: VersionedDataSource + 'static,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
for<'a> S::ReadOnly<'a>:
AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage + AggregatesStorage,
P: AvailabilityProvider<Types>,
{
#[tracing::instrument(skip_all)]
async fn aggregate(self: Arc<Self>, metrics: AggregatorMetrics) {
loop {
let start = loop {
let mut tx = match self.read().await {
Ok(tx) => tx,
Err(err) => {
tracing::error!("unable to start aggregator: {err:#}");
sleep(Duration::from_secs(5)).await;
continue;
}
};
match tx.aggregates_height().await {
Ok(height) => break height,
Err(err) => {
tracing::error!("unable to load aggregator height: {err:#}");
sleep(Duration::from_secs(5)).await;
continue;
}
};
};
tracing::info!(start, "starting aggregator");
metrics.height.set(start);

let mut blocks = self.clone().get_range::<_, BlockQueryData<Types>>(start..);
while let Some(block) = blocks.next().await {
let block = block.await;
let height = block.height();
tracing::debug!(height, "updating aggregate statistics for block");
loop {
let res = async {
let mut tx = self.write().await.context("opening transaction")?;
tx.update_aggregates(&block).await?;
tx.commit().await.context("committing transaction")
}
.await;
match res {
Ok(()) => break,
Err(err) => {
tracing::warn!(
height,
"failed to update aggregates for block: {err:#}"
);
sleep(Duration::from_secs(1)).await;
}
}
}
metrics.height.set(height as usize);
}
tracing::warn!("aggregator block stream ended unexpectedly; will restart");
}
}
}

impl<Types, S, P> Fetcher<Types, S, P>
where
Types: NodeType,
Expand Down Expand Up @@ -1366,18 +1457,24 @@ where
tx.block_height().await
}

async fn count_transactions(&self) -> QueryResult<usize> {
async fn count_transactions_in_range(
&self,
range: impl RangeBounds<usize> + Send,
) -> QueryResult<usize> {
let mut tx = self.read().await.map_err(|err| QueryError::Error {
message: err.to_string(),
})?;
tx.count_transactions().await
tx.count_transactions_in_range(range).await
}

async fn payload_size(&self) -> QueryResult<usize> {
async fn payload_size_in_range(
&self,
range: impl RangeBounds<usize> + Send,
) -> QueryResult<usize> {
let mut tx = self.read().await.map_err(|err| QueryError::Error {
message: err.to_string(),
})?;
tx.payload_size().await
tx.payload_size_in_range(range).await
}

async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
Expand Down Expand Up @@ -1769,3 +1866,18 @@ impl ScannerMetrics {
}
}
}

#[derive(Debug)]
struct AggregatorMetrics {
/// The block height for which aggregate statistics are currently available.
height: Box<dyn Gauge>,
}

impl AggregatorMetrics {
fn new(metrics: &PrometheusMetrics) -> Self {
let group = metrics.subgroup("aggregator".into());
Self {
height: group.create_gauge("height".into(), None),
}
}
}
Loading

0 comments on commit 7c01c26

Please sign in to comment.