Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feature/snapshot_generation' int…
Browse files Browse the repository at this point in the history
…o feature/stream-genesis-state
  • Loading branch information
segfault-magnet committed Dec 1, 2023
2 parents 409fa44 + 1781f3a commit b3f2098
Show file tree
Hide file tree
Showing 20 changed files with 705 additions and 524 deletions.
729 changes: 366 additions & 363 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions bin/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ const_format = { version = "0.2", optional = true }
dirs = "4.0"
dotenvy = { version = "0.15", optional = true }
fuel-core = { workspace = true }
fuel-core-storage = { workspace = true }
humantime = "2.1"
itertools = { version = "0.12" }
lazy_static = { workspace = true }
pyroscope = "0.5"
pyroscope_pprofrs = "0.2"
Expand All @@ -31,9 +33,9 @@ tikv-jemallocator = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"json",
"ansi",
"env-filter",
"json",
] }
url = { version = "2.2", optional = true }

Expand Down
7 changes: 3 additions & 4 deletions bin/fuel-core/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn init_environment() -> Option<PathBuf> {
None
}

pub async fn init_logging() -> anyhow::Result<()> {
pub fn init_logging() {
let filter = match env::var_os(LOG_FILTER) {
Some(_) => {
EnvFilter::try_from_default_env().expect("Invalid `RUST_LOG` provided")
Expand Down Expand Up @@ -95,11 +95,10 @@ pub async fn init_logging() -> anyhow::Result<()> {

tracing::subscriber::set_global_default(subscriber)
.expect("setting global default failed");
Ok(())
}

pub async fn run_cli() -> anyhow::Result<()> {
init_logging().await?;
init_logging();
if let Some(path) = init_environment() {
let path = path.display();
tracing::info!("Loading environment variables from {path}");
Expand All @@ -116,7 +115,7 @@ pub async fn run_cli() -> anyhow::Result<()> {
match opt {
Ok(opt) => match opt.command {
Fuel::Run(command) => run::exec(command).await,
Fuel::Snapshot(command) => snapshot::exec(command).await,
Fuel::Snapshot(command) => snapshot::exec(command),
},
Err(e) => {
// Prints the error and exits.
Expand Down
169 changes: 129 additions & 40 deletions bin/fuel-core/src/cli/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
use crate::cli::DEFAULT_DB_PATH;
use anyhow::Context;
use clap::{
Parser,
Subcommand,
ValueEnum,
};
use fuel_core::{
chain_config::{
ChainConfig,
ChainStateDb,
Encoder,
},
database::Database,
types::fuel_types::ContractId,
};
use fuel_core_storage::Result as StorageResult;
use itertools::Itertools;
use std::path::{
Path,
PathBuf,
};
use fuel_core::types::fuel_types::ContractId;
use std::path::PathBuf;

/// Print a snapshot of blockchain state to stdout.
#[derive(Debug, Clone, Parser)]
Expand All @@ -23,17 +38,27 @@ pub struct Command {
subcommand: SubCommands,
}

#[derive(ValueEnum, Debug, Clone, Copy)]
pub enum StateEncodingFormat {
Json,
Parquet,
}

#[derive(Debug, Clone, Subcommand)]
pub enum SubCommands {
/// Creates a snapshot of the entire database and produces a chain config.
#[command(arg_required_else_help = true)]
Everything {
/// Specify either an alias to a built-in configuration or filepath to a JSON file.
/// Specify a a path to the directory containing the chain config. Defaults used if no path
/// is provided.
#[clap(name = "CHAIN_CONFIG", long = "chain")]
chain_config: Option<String>,
chain_config: Option<PathBuf>,
/// Specify a path to an output directory for the chain config files.
#[clap(name = "OUTPUT_DIR", long = "output directory")]
#[clap(name = "OUTPUT_DIR", long = "output-directory")]
output_dir: PathBuf,
/// State encoding format
#[clap(name = "STATE_ENCODING_FORMAT", long = "state-encoding-format")]
state_encoding_format: StateEncodingFormat,
},
/// Creates a config for the contract.
#[command(arg_required_else_help = true)]
Expand All @@ -53,46 +78,110 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
}

#[cfg(any(feature = "rocksdb", feature = "rocksdb-production"))]
pub async fn exec(command: Command) -> anyhow::Result<()> {
use anyhow::Context;
use fuel_core::{
chain_config::{
ChainConfig,
StateConfig,
},
database::Database,
};
let path = command.database_path;
let data_source = fuel_core::state::rocks_db::RocksDb::default_open(&path, None)
.map_err(Into::<anyhow::Error>::into)
.context(format!(
"failed to open database at path {}",
path.display()
))?;
let db = Database::new(std::sync::Arc::new(data_source));
pub fn exec(command: Command) -> anyhow::Result<()> {
let db = open_db(&command.database_path)?;

match command.subcommand {
SubCommands::Everything {
chain_config,
output_dir,
} => {
let chain_conf = match chain_config.as_deref() {
None => ChainConfig::local_testnet(),
Some(path) => ChainConfig::load_from_directory(path)?,
};
let state_conf = StateConfig::generate_state_config(db)?;

std::fs::create_dir_all(&output_dir)?;
chain_conf.create_config_file(&output_dir)?;
state_conf.create_config_file(&output_dir)?;
}
SubCommands::Contract { contract_id } => {
let config = db.get_contract_config_by_id(contract_id)?;
let stdout = std::io::stdout().lock();

serde_json::to_writer_pretty(stdout, &config)
.context("failed to dump contract snapshot to JSON")?;
}
state_encoding_format,
} => full_snapshot(chain_config, &output_dir, state_encoding_format, &db),
SubCommands::Contract { contract_id } => contract_snapshot(&db, contract_id),
}
}

fn contract_snapshot(
db: impl ChainStateDb,
contract_id: ContractId,
) -> Result<(), anyhow::Error> {
let config = db.get_contract_config_by_id(contract_id)?;
let stdout = std::io::stdout().lock();
serde_json::to_writer_pretty(stdout, &config)
.context("failed to dump contract snapshot to JSON")?;
Ok(())
}

fn full_snapshot(
chain_config: Option<PathBuf>,
output_dir: &Path,
state_encoding_format: StateEncodingFormat,
db: impl ChainStateDb,
) -> Result<(), anyhow::Error> {
let encoder = initialize_encoder(output_dir, state_encoding_format)?;
write_chain_state(db, encoder)?;

let chain_config = load_chain_config(chain_config)?;
chain_config.create_config_file(output_dir)?;

Ok(())
}

fn write_chain_state(db: impl ChainStateDb, mut encoder: Encoder) -> anyhow::Result<()> {
fn write<T>(
data: impl Iterator<Item = StorageResult<T>>,
group_size: usize,
mut write: impl FnMut(Vec<T>) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
data.chunks(group_size)
.into_iter()
.try_for_each(|chunk| write(chunk.try_collect()?))
}

let group_size = 1000;

let coins = db.iter_coin_configs();
write(coins, group_size, |chunk| encoder.write_coins(chunk))?;

let messages = db.iter_message_configs();
write(messages, group_size, |chunk| encoder.write_messages(chunk))?;

let contracts = db.iter_contract_configs();
write(contracts, group_size, |chunk| {
encoder.write_contracts(chunk)
})?;

let contract_states = db.iter_contract_state_configs();
write(contract_states, group_size, |chunk| {
encoder.write_contract_state(chunk)
})?;

let contract_balances = db.iter_contract_balance_configs();
write(contract_balances, group_size, |chunk| {
encoder.write_contract_balance(chunk)
})?;

encoder.close()?;

Ok(())
}

fn initialize_encoder(
output_dir: &Path,
state_encoding_format: StateEncodingFormat,
) -> Result<Encoder, anyhow::Error> {
std::fs::create_dir_all(output_dir)?;
let encoder = match state_encoding_format {
StateEncodingFormat::Json => Encoder::json(output_dir),
StateEncodingFormat::Parquet => Encoder::parquet(output_dir, 1)?,
};
Ok(encoder)
}

fn load_chain_config(
chain_config: Option<PathBuf>,
) -> Result<ChainConfig, anyhow::Error> {
let chain_config = match chain_config {
Some(dir) => ChainConfig::load_from_directory(dir)?,
None => ChainConfig::local_testnet(),
};

Ok(chain_config)
}

fn open_db(path: &Path) -> anyhow::Result<impl ChainStateDb> {
let data_source = fuel_core::state::rocks_db::RocksDb::default_open(path, None)
.map_err(Into::<anyhow::Error>::into)
.context(format!("failed to open database at path {path:?}",))?;
Ok(Database::new(std::sync::Arc::new(data_source)))
}
2 changes: 1 addition & 1 deletion crates/chain-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ insta = { workspace = true }
pretty_assertions = "1.4.0"
rand = { workspace = true }
serde_json = { version = "1.0", features = ["raw_value"] }
tempfile.workspace = true
tempfile = { workspace = true }

[features]
default = ["std", "random"]
Expand Down
2 changes: 2 additions & 0 deletions crates/chain-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ pub use codec::*;
pub use coin::*;
pub use consensus::*;
pub use contract::*;
pub use contract_balance::*;
pub use contract_state::*;
pub use message::*;
pub use state::*;
19 changes: 7 additions & 12 deletions crates/chain-config/src/config/codec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod decoder;
mod encoder;
pub(crate) mod parquet;
mod parquet;

pub use decoder::{
Decoder,
Expand All @@ -21,15 +21,10 @@ type GroupResult<T> = anyhow::Result<Group<T>>;
mod tests {
use std::ops::Range;

use ::parquet::basic::{
Compression,
GzipLevel,
};

use crate::{
config::{
contract_balance::ContractBalance,
contract_state::ContractState,
contract_state::ContractStateConfig,
},
CoinConfig,
ContractConfig,
Expand Down Expand Up @@ -57,13 +52,12 @@ mod tests {
init_decoder,
group_size,
starting_group_index..num_groups,
)
);
}
{
// Parquet
let temp_dir = tempfile::tempdir().unwrap();
let compression = Compression::GZIP(GzipLevel::try_new(1).unwrap());
let state_encoder = Encoder::parquet(temp_dir.path(), compression).unwrap();
let state_encoder = Encoder::parquet(temp_dir.path(), 1).unwrap();

let init_decoder = || Decoder::parquet(temp_dir.path());

Expand All @@ -72,7 +66,7 @@ mod tests {
init_decoder,
group_size,
starting_group_index..num_groups,
)
);
}
}

Expand Down Expand Up @@ -105,7 +99,8 @@ mod tests {
let coin_batches = write_batches!(CoinConfig, write_coins);
let message_batches = write_batches!(MessageConfig, write_messages);
let contract_batches = write_batches!(ContractConfig, write_contracts);
let contract_state_batches = write_batches!(ContractState, write_contract_state);
let contract_state_batches =
write_batches!(ContractStateConfig, write_contract_state);
let contract_balance_batches =
write_batches!(ContractBalance, write_contract_balance);
encoder.close().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/chain-config/src/config/codec/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use itertools::Itertools;
use crate::{
config::{
contract_balance::ContractBalance,
contract_state::ContractState,
contract_state::ContractStateConfig,
},
CoinConfig,
ContractConfig,
Expand Down Expand Up @@ -121,7 +121,7 @@ impl Decoder {
self.create_iterator(|state| &state.contracts, "contracts")
}

pub fn contract_state(&self) -> anyhow::Result<IntoIter<ContractState>> {
pub fn contract_state(&self) -> anyhow::Result<IntoIter<ContractStateConfig>> {
self.create_iterator(|state| &state.contract_state, "contract_state")
}

Expand Down
Loading

0 comments on commit b3f2098

Please sign in to comment.