Skip to content

Commit

Permalink
more work
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielides committed Apr 23, 2024
1 parent 40564e2 commit 86c7204
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 26 deletions.
190 changes: 166 additions & 24 deletions grovedb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,32 +244,31 @@ pub struct GroveDb {
#[cfg(feature = "full")]
db: RocksDbStorage,

version: i32,
pending_chunks: BTreeMap<String, Vec<Op>>,
//current_tx: Option<Transaction<'db>>,
//restorer: Restorer<T>,
}

pub struct s_subtrees_metadata {
pub data: BTreeMap<String, (Vec<Vec<u8>>, CryptoHash, Option<CryptoHash>)>
version: i32
}

impl s_subtrees_metadata {
pub fn new() -> s_subtrees_metadata {
s_subtrees_metadata {
data: BTreeMap::new(),
}
}
pub struct state_sync_info<'db/*, S*/> {
restorer: Option<Restorer<PrefixedRocksDbImmediateStorageContext<'db>>>,
tx: Option<Transaction<'db>>,
pending_chunks :BTreeSet<Vec<u8>>,
processed_prefixes :BTreeSet<SubtreePrefix>,
current_prefix: Option<SubtreePrefix>,
version: i32,
}

impl fmt::Debug for s_subtrees_metadata {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for (prefix, metadata) in self.data.iter() {
let metadata_path = &metadata.0;
let metadata_path_str = s_util_path_to_string(&metadata_path);
write!(f, " prefix:{:?} -> path:{:?}\n", prefix, metadata_path_str);
impl/*<S>*/ state_sync_info<'_/*, S*/> {
pub fn new() -> state_sync_info<'static/*, S*/> {
let pending_chunks = BTreeSet::new();
let processed_prefixes = BTreeSet::new();
state_sync_info {
restorer: None,
tx: None,
pending_chunks: pending_chunks,
processed_prefixes: processed_prefixes,
//current_subtree_opt: None,
current_prefix: None,
version: 1
}
Ok(())
}
}

Expand Down Expand Up @@ -313,8 +312,7 @@ impl GroveDb {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {

let db = RocksDbStorage::default_rocksdb_with_path(path)?;
let pending_chunks = BTreeMap::new();
Ok(GroveDb { db, version: 1, pending_chunks })
Ok(GroveDb { db, version: 1 })
}

/// Uses raw iter to delete GroveDB key values pairs from rocksdb
Expand Down Expand Up @@ -1177,7 +1175,6 @@ impl GroveDb {
let mut current_subtree_opt :Option<(SubtreePrefix, Vec<Vec<u8>>, CryptoHash, CryptoHash, bool)> = None;


//current_subtree_opt = (SubtreePrefix::default())
{
let tx = self.start_transaction();
let merk = self.open_merk_for_replication(SubtreePath::empty(), &tx).unwrap();
Expand Down Expand Up @@ -1317,6 +1314,151 @@ impl GroveDb {
Ok(())
}

// pub fn w_start_snapshot_syncing<'db: 'a, 'a/*, S: StorageContext<'db>*/>(
// &'a self,
// state_sync_info: &'db mut state_sync_info<'a/*, S*/>,
// app_hash: CryptoHash,
// )

pub fn w_start_snapshot_syncing<'db: 'a, 'a/*, S: StorageContext<'db>*/>(
&'a self,
state_sync_info: &'db mut state_sync_info<'a/*, S*/>,
app_hash: CryptoHash,
) -> Result<Vec<Vec<u8>>, Error>{
let mut res = vec![];

match (&mut state_sync_info.restorer, &state_sync_info.tx, &state_sync_info.current_prefix) {
(None, None, None) => {
if state_sync_info.pending_chunks.is_empty() && state_sync_info.processed_prefixes.is_empty() {
let root_prefix = [0u8; 32];
state_sync_info.tx = Some(self.start_transaction());
if let Some(ref_tx) = state_sync_info.tx.as_ref() {
let merk = self.open_merk_for_replication(SubtreePath::empty(), ref_tx).unwrap();
let restorer = Restorer::new(merk, app_hash, None);
state_sync_info.restorer = Some(restorer);
state_sync_info.current_prefix = Some(root_prefix);
state_sync_info.pending_chunks.insert(root_prefix.to_vec());

res.push(root_prefix.to_vec());
}
else {
return Err(Error::InternalError(
"Unable to start a tx",
));
}
} else {
return Err(Error::InternalError(
"Invalid internal state sync info",
));
}
},
_ => {
return Err(Error::InternalError(
"GroveDB has already started a snapshot syncing",
));
}
}

Ok(res)
}

pub fn w_apply_chunk<'db: 'a, 'a/*, S: StorageContext<'db>*/>(
&'a self,
state_sync_info: &'db mut state_sync_info<'a/*, S*/>,
chunk: (Vec<u8>, Vec<Op>)
) -> Result<Vec<Vec<u8>>, Error>{
let mut res = vec![];

let (global_chunk_id, chunk_data) = chunk;
let (chunk_prefix, chunk_id) = w_util_split_global_chunk_id(&global_chunk_id)?;

match (&mut state_sync_info.restorer, &state_sync_info.tx, &state_sync_info.current_prefix) {
(Some(restorer), Some(tx), Some(ref current_prefix)) => {
if (*current_prefix != chunk_prefix) {
return Err(Error::InternalError(
"Invalid incoming prefix",
));
}
if (!state_sync_info.pending_chunks.contains(&global_chunk_id)) {
return Err(Error::InternalError(
"Incoming global_chunk_id not expected",
));
}
state_sync_info.pending_chunks.remove(&global_chunk_id);
match restorer.process_chunk(chunk_id.to_string(), chunk_data) {
Ok(next_chunk_ids) => {
for next_chunk_id in next_chunk_ids {
let mut next_global_chunk_id = chunk_prefix.to_vec();
next_global_chunk_id.extend(next_chunk_id.as_bytes().to_vec());
state_sync_info.pending_chunks.insert(next_global_chunk_id.clone());
res.push(next_global_chunk_id);
}
},
_ => {
return Err(Error::InternalError(
"Unable to process incoming chunk",
));
},
};
}
_ => {
return Err(Error::InternalError(
"GroveDB is not in syncing mode",
));
}
}

if (res.is_empty()) {
match (state_sync_info.restorer.take(), state_sync_info.tx.take(), state_sync_info.current_prefix.take()) {
(Some(restorer), Some(tx), Some(current_prefix)) => {
//make sure that pending_chunks is empty
if (!restorer.finalize().is_ok()) {
return Err(Error::InternalError(
"Unable to finalize merk",
));
}
self.commit_transaction(tx);
state_sync_info.processed_prefixes.insert(current_prefix);
let subtrees_metadata = crate::w_subtree_metadata::new();
for (prefix, prefix_metadata) in &subtrees_metadata.data {
if !state_sync_info.processed_prefixes.contains(prefix) {
let current_path = &prefix_metadata.0;
let s_actual_value_hash = &prefix_metadata.1;
let s_elem_value_hash = &prefix_metadata.2;

let subtree_path: Vec<&[u8]> = current_path.iter().map(|vec| vec.as_slice()).collect();
let path: &[&[u8]] = &subtree_path;

state_sync_info.tx = Some(self.start_transaction());
if let Some(ref_val) = state_sync_info.tx.as_ref() {
let merk = self.open_merk_for_replication(path.into(), ref_val).unwrap();
let restorer = Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash));
state_sync_info.restorer = Some(restorer);
state_sync_info.current_prefix = Some(*prefix);

let mut root_chunk_prefix = prefix.to_vec();
state_sync_info.pending_chunks.insert(root_chunk_prefix.clone());
res.push(root_chunk_prefix);
}
else {
return Err(Error::InternalError(
"Unable to start a tx",
));
}
break;
}
}
},
_ => {
return Err(Error::InternalError(
"Unable to finalize tree",
));
}
}
}

Ok(res)
}
}

pub fn s_util_path_to_string(
Expand Down
16 changes: 14 additions & 2 deletions tutorials/src/bin/replication.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::VecDeque;
use std::ops::Range;
use std::path::Path;
use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction};
use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction, state_sync_info};
use grovedb::reference_path::ReferencePathType;
use rand::{distributions::Alphanumeric, Rng, thread_rng};
use rand::prelude::SliceRandom;
Expand Down Expand Up @@ -89,7 +89,9 @@ fn main() {
println!("root_hash_copy: {:?}", hex::encode(root_hash_copy));

println!("\n######### db_checkpoint_0 -> db_copy state sync");
db_copy.w_sync_db_demo(&db_checkpoint_0).unwrap();
sync_db_demo(&db_checkpoint_0, &db_copy).unwrap();
//db_copy.w_sync_db_demo(&db_checkpoint_0).unwrap();
return;

println!("\n######### root_hashes:");
let root_hash_0 = db_0.root_hash(None).unwrap().unwrap();
Expand Down Expand Up @@ -215,3 +217,13 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec<u8>) {
} else { println!("Verification FAILED"); };
}

fn sync_db_demo(
source_db: &GroveDb,
target_db: &GroveDb,
) -> Result<(), grovedb::Error> {
let mut state_sync_inf = state_sync_info::new();
let app_hash = source_db.root_hash(None).value.unwrap();
//target_db.w_start_snapshot_syncing(&mut state_sync_inf, app_hash);
Ok(())
}

0 comments on commit 86c7204

Please sign in to comment.