Skip to content

Commit

Permalink
Merge pull request #632 from EspressoSystems/jb/archival
Browse files Browse the repository at this point in the history
Enable recovery of an archival node from a pruned node
  • Loading branch information
jbearer authored Jun 7, 2024
2 parents 7b363e1 + 1429b23 commit 54261a8
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 1 deletion.
28 changes: 28 additions & 0 deletions src/data_source/storage/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ pub struct Config {
no_migrations: bool,
tls: bool,
pruner_cfg: Option<PrunerCfg>,
archive: bool,
}

impl Default for Config {
Expand All @@ -243,6 +244,7 @@ impl Default for Config {
no_migrations: false,
tls: false,
pruner_cfg: None,
archive: false,
}
}
}
Expand Down Expand Up @@ -349,11 +351,29 @@ impl Config {
self
}

/// Enable pruning with a given configuration.
///
/// If [`archive`](Self::archive) was previously specified, this will override it.
pub fn pruner_cfg(mut self, cfg: PrunerCfg) -> Result<Self, Error> {
cfg.validate()?;
self.pruner_cfg = Some(cfg);
self.archive = false;
Ok(self)
}

/// Disable pruning and reconstruct previously pruned data.
///
/// While running without pruning is the default behavior, the default will not try to
/// reconstruct data that was pruned in a previous run where pruning was enabled. This option
/// instructs the service to run without pruning _and_ reconstruct all previously pruned data by
/// fetching from peers.
///
/// If [`pruner_cfg`](Self::pruner_cfg) was previously specified, this will override it.
pub fn archive(mut self) -> Self {
self.pruner_cfg = None;
self.archive = true;
self
}
}

/// Storage for the APIs provided in this crate, backed by a remote PostgreSQL database.
Expand Down Expand Up @@ -453,6 +473,14 @@ impl SqlStorage {

let pruner = config.pruner_cfg.map(Pruner::new).unwrap_or_default();

if config.archive {
// If running in archive mode, ensure the pruned height is set to 0, so the fetcher will
// reconstruct previously pruned data.
client
.batch_execute("DELETE FROM pruned_height WHERE id = 1")
.await?;
}

Ok(Self {
client: Arc::new(client),
tx_in_progress: false,
Expand Down
159 changes: 158 additions & 1 deletion src/fetching/provider/query_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,14 @@ mod test {
},
data_source::{
sql::{self, SqlDataSource},
storage::sql::testing::TmpDb,
storage::{
pruning::{PrunedHeightStorage, PrunerCfg},
sql::testing::TmpDb,
},
AvailabilityProvider, VersionedDataSource,
},
fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
node::{data_source::NodeDataSource, SyncStatus},
task::BackgroundTask,
testing::{
consensus::{MockDataSource, MockNetwork},
Expand Down Expand Up @@ -900,4 +904,157 @@ mod test {
.await;
assert_eq!(res, None);
}

#[async_std::test]
async fn test_archive_recovery() {
setup_test();

// Create the consensus network.
let mut network = MockNetwork::<MockDataSource>::init().await;

// Start a web server that the non-consensus node can use to fetch blocks.
let port = pick_unused_port().unwrap();
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module(
"availability",
define_api(&Default::default(), STATIC_VER_0_1).unwrap(),
)
.unwrap();
network.spawn(
"server",
app.serve(format!("0.0.0.0:{port}"), STATIC_VER_0_1),
);

// Start a data source which is not receiving events from consensus, only from a peer. The
// data source is at first configured to aggressively prune data.
let db = TmpDb::init().await;
let provider = Provider::new(QueryServiceProvider::new(
format!("http://localhost:{port}").parse().unwrap(),
STATIC_VER_0_1,
));
let mut data_source = db
.config()
.pruner_cfg(
PrunerCfg::new()
.with_target_retention(Duration::from_secs(0))
.with_interval(Duration::from_secs(1)),
)
.unwrap()
.connect(provider.clone())
.await
.unwrap();

// Start consensus.
network.start().await;

// Wait until a few blocks are produced.
let leaves = { network.data_source().read().await.subscribe_leaves(1).await };
let leaves = leaves.take(5).collect::<Vec<_>>().await;

// The disconnected data source has no data yet, so it hasn't done any pruning.
let pruned_height = {
data_source
.storage()
.await
.load_pruned_height()
.await
.unwrap()
};
// Either None or 0 is acceptable, depending on whether or not the prover has run yet.
assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");

// Send the last leaf to the disconnected data source so it learns about the height and
// fetches the missing data.
let last_leaf = leaves.last().unwrap();
data_source.insert_leaf(last_leaf.clone()).await.unwrap();
data_source.commit().await.unwrap();

// Trigger a fetch of each leaf so the database gets populated.
for i in 1..=last_leaf.height() {
tracing::info!(i, "fetching leaf");
assert_eq!(
data_source.get_leaf(i as usize).await.await,
leaves[i as usize - 1]
);
}

// After a bit of time, the pruner has run and deleted all the missing data we just fetched.
loop {
let pruned_height = data_source
.storage()
.await
.load_pruned_height()
.await
.unwrap();
if pruned_height == Some(last_leaf.height()) {
break;
}
tracing::info!(
?pruned_height,
target_height = last_leaf.height(),
"waiting for pruner to run"
);
sleep(Duration::from_secs(1)).await;
}

// Now close the data source and restart it with archive recovery.
data_source = db
.config()
.archive()
.builder(provider.clone())
.await
.unwrap()
.with_minor_scan_interval(Duration::from_secs(1))
.with_major_scan_interval(1)
.build()
.await
.unwrap();

// Pruned height should be reset.
let pruned_height = {
data_source
.storage()
.await
.load_pruned_height()
.await
.unwrap()
};
assert_eq!(pruned_height, None);

// The node has pruned all of it's data including the latest block, so it's forgotten the
// block height. We need to give it another leaf with some height so it will be willing to
// fetch.
data_source.insert_leaf(last_leaf.clone()).await.unwrap();
data_source.commit().await.unwrap();

// Wait for the data to be restored. It should be restored by the next major scan.
loop {
let sync_status = data_source.sync_status().await.await.unwrap();

// VID shares are unique to a node and will never be fetched from a peer; this is
// acceptable since there is redundancy built into the VID scheme. Ignore missing VID
// shares in the `is_fully_synced` check.
if (SyncStatus {
missing_vid_shares: 0,
..sync_status
})
.is_fully_synced()
{
break;
}
tracing::info!(?sync_status, "waiting for node to sync");
}

// The node remains fully synced even after some time; no pruning.
sleep(Duration::from_secs(3)).await;
let sync_status = data_source.sync_status().await.await.unwrap();
assert!(
(SyncStatus {
missing_vid_shares: 0,
..sync_status
})
.is_fully_synced(),
"{sync_status:?}"
);
}
}

0 comments on commit 54261a8

Please sign in to comment.