Skip to content

Commit

Permalink
Merge pull request #735 from EspressoSystems/jb/aggregate-table
Browse files Browse the repository at this point in the history
Improve efficiency of aggregated statistics
  • Loading branch information
jbearer authored Nov 15, 2024
2 parents 050aae1 + a6896e5 commit 7d55e33
Show file tree
Hide file tree
Showing 19 changed files with 711 additions and 114 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ jobs:
run: |
cargo nextest run --workspace --release --all-features
timeout-minutes: 60

- name: Doc Test
run: cargo test --release --all-features --doc

- name: Generate Documentation
run: |
Expand Down
20 changes: 16 additions & 4 deletions api/node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,33 @@ Returns an integer.
"""

[route.count_transactions]
PATH = ["transactions/count"]
PATH = ["transactions/count", "transactions/count/:to", "transactions/count/:from/:to"]
":from" = "Integer"
":to" = "Integer"
DOC = """
Get the number of transactions in the chain.
Warning: count may be low if data is currently being indexed (see `sync-status`).
If `:from` or `:to` is specified, they restrict the range of blocks considered.
`transactions/count/:to` will return the number of transactions in all blocks up to and including
block number `:to`, while `transactions/count/:from/:to` will count the transactions in all blocks
between `:from` (inclusive) and `:to` (inclusive).
Returns an integer.
"""

[route.payload_size]
PATH = ["payloads/total-size"]
PATH = ["payloads/size", "payloads/size/:to", "payloads/size/:from/:to", "payloads/total-size"]
":from" = "Integer"
":to" = "Integer"
DOC = """
Get the size (in bytes) of all payload data in the chain.
Warning: size may be low if data is currently being indexed (see `sync-status`).
If `:from` or `:to` is specified, they restrict the range of blocks considered. `payloads/size/:to`
will return the cumulative size of all payloads in blocks up to and including block number `:to`,
while `payloads/size/:from/:to` will return the cumulative size in all blocks between `:from`
(inclusive) and `:to` (inclusive).
`payloads/total-size` is a deprecated alias for `payloads/size`.
Returns an integer.
"""
Expand Down
5 changes: 5 additions & 0 deletions migrations/V200__create_aggregates_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE aggregate (
height BIGINT PRIMARY KEY REFERENCES header (height) ON DELETE CASCADE,
num_transactions BIGINT NOT NULL,
payload_size BIGINT NOT NULL
);
40 changes: 32 additions & 8 deletions src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,11 +680,11 @@ pub mod node_tests {
storage::{NodeStorage, UpdateAvailabilityStorage},
update::Transaction,
},
node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart},
node::{BlockId, NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
testing::{
consensus::{MockNetwork, TestableDataSource},
mocks::{mock_transaction, MockPayload, MockTypes},
setup_test,
setup_test, sleep,
},
types::HeightIndexed,
Header, VidShare,
Expand All @@ -704,6 +704,7 @@ pub mod node_tests {
vid::{vid_scheme, VidSchemeType},
};
use jf_vid::VidScheme;
use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
pub async fn test_sync_status<D: TestableDataSource>()
Expand Down Expand Up @@ -867,11 +868,10 @@ pub mod node_tests {
assert_eq!(ds.count_transactions().await.unwrap(), 0);
assert_eq!(ds.payload_size().await.unwrap(), 0);

// Insert some transactions. We insert the blocks out of order to check that the counters
// account for missing blocks fetched later.
// Insert some transactions.
let mut total_transactions = 0;
let mut total_size = 0;
for i in [0, 2, 1] {
'outer: for i in [0, 1, 2] {
// Using `i % 2` as the transaction data ensures we insert a duplicate transaction
// (since we insert more than 2 transactions total). The query service should still
// count these as separate transactions and should include both duplicates when
Expand Down Expand Up @@ -907,15 +907,39 @@ pub mod node_tests {
.await;
*leaf.leaf.block_header_mut() = header.clone();
let block = BlockQueryData::new(header, payload);
ds.append(BlockInfo::new(leaf, Some(block), None, None))
ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
.await
.unwrap();
assert_eq!(
NodeDataSource::<MockTypes>::block_height(&ds)
.await
.unwrap(),
(i + 1) as usize,
);

total_transactions += 1;
total_size += encoded.len();

assert_eq!(ds.count_transactions().await.unwrap(), total_transactions);
assert_eq!(ds.payload_size().await.unwrap(), total_size);
// Allow some time for the aggregator to update.
for retry in 0..5 {
let ds_transactions = ds.count_transactions().await.unwrap();
let ds_payload_size = ds.payload_size().await.unwrap();
if ds_transactions != total_transactions || ds_payload_size != total_size {
tracing::info!(
i,
retry,
total_transactions,
ds_transactions,
total_size,
ds_payload_size,
"waiting for statistics to update"
);
sleep(Duration::from_secs(1)).await;
} else {
continue 'outer;
}
}
panic!("counters did not update in time");
}
}

Expand Down
14 changes: 10 additions & 4 deletions src/data_source/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,17 @@ where
async fn block_height(&self) -> QueryResult<usize> {
self.data_source.block_height().await
}
async fn count_transactions(&self) -> QueryResult<usize> {
self.data_source.count_transactions().await
async fn count_transactions_in_range(
&self,
range: impl RangeBounds<usize> + Send,
) -> QueryResult<usize> {
self.data_source.count_transactions_in_range(range).await
}
async fn payload_size(&self) -> QueryResult<usize> {
self.data_source.payload_size().await
async fn payload_size_in_range(
&self,
range: impl RangeBounds<usize> + Send,
) -> QueryResult<usize> {
self.data_source.payload_size_in_range(range).await
}
async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
where
Expand Down
132 changes: 122 additions & 10 deletions src/data_source/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ use super::{
notifier::Notifier,
storage::{
pruning::{PruneStorage, PrunedHeightStorage},
AvailabilityStorage, ExplorerStorage, MerklizedStateHeightStorage, MerklizedStateStorage,
NodeStorage, UpdateAvailabilityStorage,
AggregatesStorage, AvailabilityStorage, ExplorerStorage, MerklizedStateHeightStorage,
MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage, UpdateAvailabilityStorage,
},
Transaction, VersionedDataSource,
};
Expand Down Expand Up @@ -153,6 +153,7 @@ pub struct Builder<Types, S, P> {
active_fetch_delay: Duration,
chunk_fetch_delay: Duration,
proactive_fetching: bool,
aggregator: bool,
_types: PhantomData<Types>,
}

Expand Down Expand Up @@ -187,6 +188,7 @@ impl<Types, S, P> Builder<Types, S, P> {
active_fetch_delay: Duration::from_millis(50),
chunk_fetch_delay: Duration::from_millis(100),
proactive_fetching: true,
aggregator: true,
_types: Default::default(),
}
}
Expand Down Expand Up @@ -320,6 +322,15 @@ impl<Types, S, P> Builder<Types, S, P> {
self.proactive_fetching = false;
self
}

/// Run without an aggregator.
///
/// This can reduce load on the CPU and the database, but it will cause aggregate statistics
/// (such as transaction counts) not to update.
pub fn disable_aggregator(mut self) -> Self {
self.aggregator = false;
self
}
}

impl<Types, S, P> Builder<Types, S, P>
Expand All @@ -328,8 +339,9 @@ where
Payload<Types>: QueryablePayload<Types>,
Header<Types>: QueryableHeader<Types>,
S: PruneStorage + VersionedDataSource + HasMetrics + 'static,
for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + PrunedHeightStorage + NodeStorage<Types>,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
for<'a> S::ReadOnly<'a>:
AvailabilityStorage<Types> + PrunedHeightStorage + NodeStorage<Types> + AggregatesStorage,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
P: AvailabilityProvider<Types>,
{
/// Build a [`FetchingDataSource`] with these options.
Expand Down Expand Up @@ -367,6 +379,8 @@ where
fetcher: Arc<Fetcher<Types, S, P>>,
// The proactive scanner task. This is only saved here so that we can cancel it on drop.
scanner: Option<BackgroundTask>,
// The aggregator task, which derives aggregate statistics from a block stream.
aggregator: Option<BackgroundTask>,
pruner: Pruner<Types, S, P>,
}

Expand Down Expand Up @@ -419,8 +433,9 @@ where
Payload<Types>: QueryablePayload<Types>,
Header<Types>: QueryableHeader<Types>,
S: VersionedDataSource + PruneStorage + HasMetrics + 'static,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
for<'a> S::ReadOnly<'a>:
AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage + AggregatesStorage,
P: AvailabilityProvider<Types>,
{
/// Build a [`FetchingDataSource`] with the given `storage` and `provider`.
Expand All @@ -429,6 +444,7 @@ where
}

async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
let aggregator = builder.aggregator;
let proactive_fetching = builder.proactive_fetching;
let minor_interval = builder.minor_scan_interval;
let major_interval = builder.major_scan_interval;
Expand All @@ -437,6 +453,7 @@ where
.proactive_range_chunk_size
.unwrap_or(builder.range_chunk_size);
let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());

let fetcher = Arc::new(Fetcher::new(builder).await?);
let scanner = if proactive_fetching {
Expand All @@ -454,11 +471,21 @@ where
None
};

let aggregator = if aggregator {
Some(BackgroundTask::spawn(
"aggregator",
fetcher.clone().aggregate(aggregator_metrics),
))
} else {
None
};

let pruner = Pruner::new(fetcher.clone()).await;
let ds = Self {
fetcher,
scanner,
pruner,
aggregator,
};

Ok(ds)
Expand Down Expand Up @@ -1166,6 +1193,70 @@ where
}
}

impl<Types, S, P> Fetcher<Types, S, P>
where
Types: NodeType,
Payload<Types>: QueryablePayload<Types>,
S: VersionedDataSource + 'static,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
for<'a> S::ReadOnly<'a>:
AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage + AggregatesStorage,
P: AvailabilityProvider<Types>,
{
#[tracing::instrument(skip_all)]
async fn aggregate(self: Arc<Self>, metrics: AggregatorMetrics) {
loop {
let start = loop {
let mut tx = match self.read().await {
Ok(tx) => tx,
Err(err) => {
tracing::error!("unable to start aggregator: {err:#}");
sleep(Duration::from_secs(5)).await;
continue;
}
};
match tx.aggregates_height().await {
Ok(height) => break height,
Err(err) => {
tracing::error!("unable to load aggregator height: {err:#}");
sleep(Duration::from_secs(5)).await;
continue;
}
};
};
tracing::info!(start, "starting aggregator");
metrics.height.set(start);

let mut blocks = self.clone().get_range::<_, BlockQueryData<Types>>(start..);
while let Some(block) = blocks.next().await {
let block = block.await;
let height = block.height();
tracing::debug!(height, "updating aggregate statistics for block");
loop {
let res = async {
let mut tx = self.write().await.context("opening transaction")?;
tx.update_aggregates(&block).await?;
tx.commit().await.context("committing transaction")
}
.await;
match res {
Ok(()) => break,
Err(err) => {
tracing::warn!(
height,
"failed to update aggregates for block: {err:#}"
);
sleep(Duration::from_secs(1)).await;
}
}
}
metrics.height.set(height as usize);
}
tracing::warn!("aggregator block stream ended unexpectedly; will restart");
}
}
}

impl<Types, S, P> Fetcher<Types, S, P>
where
Types: NodeType,
Expand Down Expand Up @@ -1366,18 +1457,24 @@ where
tx.block_height().await
}

async fn count_transactions(&self) -> QueryResult<usize> {
async fn count_transactions_in_range(
&self,
range: impl RangeBounds<usize> + Send,
) -> QueryResult<usize> {
let mut tx = self.read().await.map_err(|err| QueryError::Error {
message: err.to_string(),
})?;
tx.count_transactions().await
tx.count_transactions_in_range(range).await
}

async fn payload_size(&self) -> QueryResult<usize> {
async fn payload_size_in_range(
&self,
range: impl RangeBounds<usize> + Send,
) -> QueryResult<usize> {
let mut tx = self.read().await.map_err(|err| QueryError::Error {
message: err.to_string(),
})?;
tx.payload_size().await
tx.payload_size_in_range(range).await
}

async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
Expand Down Expand Up @@ -1769,3 +1866,18 @@ impl ScannerMetrics {
}
}
}

#[derive(Debug)]
struct AggregatorMetrics {
/// The block height for which aggregate statistics are currently available.
height: Box<dyn Gauge>,
}

impl AggregatorMetrics {
fn new(metrics: &PrometheusMetrics) -> Self {
let group = metrics.subgroup("aggregator".into());
Self {
height: group.create_gauge("height".into(), None),
}
}
}
Loading

0 comments on commit 7d55e33

Please sign in to comment.