From 86c72047baf5034c0beed783e1aa6d3ce95025d1 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Tue, 23 Apr 2024 21:10:22 +0300 Subject: [PATCH] more work --- grovedb/src/lib.rs | 190 +++++++++++++++++++++++++++---- tutorials/src/bin/replication.rs | 16 ++- 2 files changed, 180 insertions(+), 26 deletions(-) diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 14d3ab9a..286c487d 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -244,32 +244,31 @@ pub struct GroveDb { #[cfg(feature = "full")] db: RocksDbStorage, - version: i32, - pending_chunks: BTreeMap>, - //current_tx: Option>, - //restorer: Restorer, -} - -pub struct s_subtrees_metadata { - pub data: BTreeMap>, CryptoHash, Option)> + version: i32 } -impl s_subtrees_metadata { - pub fn new() -> s_subtrees_metadata { - s_subtrees_metadata { - data: BTreeMap::new(), - } - } +pub struct state_sync_info<'db/*, S*/> { + restorer: Option>>, + tx: Option>, + pending_chunks :BTreeSet>, + processed_prefixes :BTreeSet, + current_prefix: Option, + version: i32, } -impl fmt::Debug for s_subtrees_metadata { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - for (prefix, metadata) in self.data.iter() { - let metadata_path = &metadata.0; - let metadata_path_str = s_util_path_to_string(&metadata_path); - write!(f, " prefix:{:?} -> path:{:?}\n", prefix, metadata_path_str); +impl/**/ state_sync_info<'_/*, S*/> { + pub fn new() -> state_sync_info<'static/*, S*/> { + let pending_chunks = BTreeSet::new(); + let processed_prefixes = BTreeSet::new(); + state_sync_info { + restorer: None, + tx: None, + pending_chunks: pending_chunks, + processed_prefixes: processed_prefixes, + //current_subtree_opt: None, + current_prefix: None, + version: 1 } - Ok(()) } } @@ -313,8 +312,7 @@ impl GroveDb { pub fn open>(path: P) -> Result { let db = RocksDbStorage::default_rocksdb_with_path(path)?; - let pending_chunks = BTreeMap::new(); - Ok(GroveDb { db, version: 1, pending_chunks }) + Ok(GroveDb { db, version: 1 }) } /// Uses raw iter to delete GroveDB key values pairs from rocksdb @@ -1177,7 +1175,6 @@ impl GroveDb { let mut current_subtree_opt :Option<(SubtreePrefix, Vec>, CryptoHash, CryptoHash, bool)> = None; - //current_subtree_opt = (SubtreePrefix::default()) { let tx = self.start_transaction(); let merk = self.open_merk_for_replication(SubtreePath::empty(), &tx).unwrap(); @@ -1317,6 +1314,151 @@ impl GroveDb { Ok(()) } + // pub fn w_start_snapshot_syncing<'db: 'a, 'a/*, S: StorageContext<'db>*/>( + // &'a self, + // state_sync_info: &'db mut state_sync_info<'a/*, S*/>, + // app_hash: CryptoHash, + // ) + + pub fn w_start_snapshot_syncing<'db: 'a, 'a/*, S: StorageContext<'db>*/>( + &'a self, + state_sync_info: &'db mut state_sync_info<'a/*, S*/>, + app_hash: CryptoHash, + ) -> Result>, Error>{ + let mut res = vec![]; + + match (&mut state_sync_info.restorer, &state_sync_info.tx, &state_sync_info.current_prefix) { + (None, None, None) => { + if state_sync_info.pending_chunks.is_empty() && state_sync_info.processed_prefixes.is_empty() { + let root_prefix = [0u8; 32]; + state_sync_info.tx = Some(self.start_transaction()); + if let Some(ref_tx) = state_sync_info.tx.as_ref() { + let merk = self.open_merk_for_replication(SubtreePath::empty(), ref_tx).unwrap(); + let restorer = Restorer::new(merk, app_hash, None); + state_sync_info.restorer = Some(restorer); + state_sync_info.current_prefix = Some(root_prefix); + state_sync_info.pending_chunks.insert(root_prefix.to_vec()); + + res.push(root_prefix.to_vec()); + } + else { + return Err(Error::InternalError( + "Unable to start a tx", + )); + } + } else { + return Err(Error::InternalError( + "Invalid internal state sync info", + )); + } + }, + _ => { + return Err(Error::InternalError( + "GroveDB has already started a snapshot syncing", + )); + } + } + + Ok(res) + } + + pub fn w_apply_chunk<'db: 'a, 'a/*, S: StorageContext<'db>*/>( + &'a self, + state_sync_info: &'db mut state_sync_info<'a/*, S*/>, + chunk: (Vec, Vec) + ) -> Result>, Error>{ + let mut res = vec![]; + + let (global_chunk_id, chunk_data) = chunk; + let (chunk_prefix, chunk_id) = w_util_split_global_chunk_id(&global_chunk_id)?; + + match (&mut state_sync_info.restorer, &state_sync_info.tx, &state_sync_info.current_prefix) { + (Some(restorer), Some(tx), Some(ref current_prefix)) => { + if (*current_prefix != chunk_prefix) { + return Err(Error::InternalError( + "Invalid incoming prefix", + )); + } + if (!state_sync_info.pending_chunks.contains(&global_chunk_id)) { + return Err(Error::InternalError( + "Incoming global_chunk_id not expected", + )); + } + state_sync_info.pending_chunks.remove(&global_chunk_id); + match restorer.process_chunk(chunk_id.to_string(), chunk_data) { + Ok(next_chunk_ids) => { + for next_chunk_id in next_chunk_ids { + let mut next_global_chunk_id = chunk_prefix.to_vec(); + next_global_chunk_id.extend(next_chunk_id.as_bytes().to_vec()); + state_sync_info.pending_chunks.insert(next_global_chunk_id.clone()); + res.push(next_global_chunk_id); + } + }, + _ => { + return Err(Error::InternalError( + "Unable to process incoming chunk", + )); + }, + }; + } + _ => { + return Err(Error::InternalError( + "GroveDB is not in syncing mode", + )); + } + } + + if (res.is_empty()) { + match (state_sync_info.restorer.take(), state_sync_info.tx.take(), state_sync_info.current_prefix.take()) { + (Some(restorer), Some(tx), Some(current_prefix)) => { + //make sure that pending_chunks is empty + if (!restorer.finalize().is_ok()) { + return Err(Error::InternalError( + "Unable to finalize merk", + )); + } + self.commit_transaction(tx); + state_sync_info.processed_prefixes.insert(current_prefix); + let subtrees_metadata = crate::w_subtree_metadata::new(); + for (prefix, prefix_metadata) in &subtrees_metadata.data { + if !state_sync_info.processed_prefixes.contains(prefix) { + let current_path = &prefix_metadata.0; + let s_actual_value_hash = &prefix_metadata.1; + let s_elem_value_hash = &prefix_metadata.2; + + let subtree_path: Vec<&[u8]> = current_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + + state_sync_info.tx = Some(self.start_transaction()); + if let Some(ref_val) = state_sync_info.tx.as_ref() { + let merk = self.open_merk_for_replication(path.into(), ref_val).unwrap(); + let restorer = Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); + state_sync_info.restorer = Some(restorer); + state_sync_info.current_prefix = Some(*prefix); + + let mut root_chunk_prefix = prefix.to_vec(); + state_sync_info.pending_chunks.insert(root_chunk_prefix.clone()); + res.push(root_chunk_prefix); + } + else { + return Err(Error::InternalError( + "Unable to start a tx", + )); + } + break; + } + } + }, + _ => { + return Err(Error::InternalError( + "Unable to finalize tree", + )); + } + } + } + + Ok(res) + } } pub fn s_util_path_to_string( diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index 0ca0c576..568f2a2a 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use std::ops::Range; use std::path::Path; -use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction}; +use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction, state_sync_info}; use grovedb::reference_path::ReferencePathType; use rand::{distributions::Alphanumeric, Rng, thread_rng}; use rand::prelude::SliceRandom; @@ -89,7 +89,9 @@ fn main() { println!("root_hash_copy: {:?}", hex::encode(root_hash_copy)); println!("\n######### db_checkpoint_0 -> db_copy state sync"); - db_copy.w_sync_db_demo(&db_checkpoint_0).unwrap(); + sync_db_demo(&db_checkpoint_0, &db_copy).unwrap(); + //db_copy.w_sync_db_demo(&db_checkpoint_0).unwrap(); + return; println!("\n######### root_hashes:"); let root_hash_0 = db_0.root_hash(None).unwrap().unwrap(); @@ -215,3 +217,13 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec) { } else { println!("Verification FAILED"); }; } +fn sync_db_demo( + source_db: &GroveDb, + target_db: &GroveDb, +) -> Result<(), grovedb::Error> { + let mut state_sync_inf = state_sync_info::new(); + let app_hash = source_db.root_hash(None).value.unwrap(); + //target_db.w_start_snapshot_syncing(&mut state_sync_inf, app_hash); + Ok(()) +} +