From 3b244b71d41e8b78e49cd741d35520ebdc1ee2df Mon Sep 17 00:00:00 2001 From: Gautham Date: Tue, 1 Aug 2023 10:36:22 +0300 Subject: [PATCH 01/13] Comment out proxy check --- pallets/ocex/src/validator.rs | 14 ++++++++------ runtime/src/lib.rs | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pallets/ocex/src/validator.rs b/pallets/ocex/src/validator.rs index fadfa43f3..54f235ec1 100644 --- a/pallets/ocex/src/validator.rs +++ b/pallets/ocex/src/validator.rs @@ -1,5 +1,5 @@ use crate::{ - pallet::{Accounts, ValidatorSetId}, + pallet::ValidatorSetId, settlement::{add_balance, process_trade, sub_balance}, snapshot::StateInfo, storage::store_trie_root, @@ -242,12 +242,14 @@ impl Pallet { ) -> Result, &'static str> { log::info!(target:"ocex","Settling withdraw request..."); let amount = request.amount().map_err(|_| "decimal conversion error")?; - let account_info = >::get(&request.main).ok_or("Main account not found")?; + // FIXME: Don't remove these comments, will be reintroduced after fixing the race condition + // let account_info = >::get(&request.main).ok_or("Main account not found")?; + + // if !account_info.proxies.contains(&request.proxy) { + // // TODO: Check Race condition + // return Err("Proxy not found") + // } - if !account_info.proxies.contains(&request.proxy) { - // TODO: Check Race condition - return Err("Proxy not found") - } if !request.verify() { return Err("SignatureVerificationFailed") } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 013f74606..dd1304782 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 294, + spec_version: 295, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 2, From f538545de7d23ef335b6c3f04d55ebcec3a5a222 Mon Sep 17 00:00:00 2001 From: Gautham Date: Tue, 1 Aug 2023 15:12:02 +0300 Subject: [PATCH 02/13] Fullnodes should store data and stid should be stored in state info --- pallets/ocex/src/validator.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pallets/ocex/src/validator.rs b/pallets/ocex/src/validator.rs index 54f235ec1..f195c374b 100644 --- a/pallets/ocex/src/validator.rs +++ b/pallets/ocex/src/validator.rs @@ -140,15 +140,16 @@ impl Pallet { log::info!(target:"ocex","Processing user actions for nonce: {:?}",next_nonce); let withdrawals = Self::process_batch(&mut state, &batch, &mut state_info)?; - if sp_io::offchain::is_validator() { - // Create state hash. - state_info.snapshot_id = batch.snapshot_id; // Store the processed nonce - Self::store_state_info(state_info.clone(), &mut state)?; - state.commit(); - let state_hash: H256 = *state.root(); - store_trie_root(state_hash); - log::info!(target:"ocex","updated trie root: {:?}", state.root()); + // Create state hash and store it + state_info.stid = batch.stid; + state_info.snapshot_id = batch.snapshot_id; // Store the processed nonce + Self::store_state_info(state_info.clone(), &mut state)?; + state.commit(); + let state_hash: H256 = *state.root(); + store_trie_root(state_hash); + log::info!(target:"ocex","updated trie root: {:?}", state.root()); + if sp_io::offchain::is_validator() { match available_keys.get(0) { None => return Err("No active keys found"), Some(key) => { @@ -348,7 +349,7 @@ impl Pallet { use parity_scale_codec::alloc::string::ToString; use sp_std::borrow::ToOwned; -fn get_user_action_batch(id: u64) -> Option> { +pub fn get_user_action_batch(id: u64) -> Option> { let body = serde_json::json!({ "id": id }).to_string(); let result = match send_request("user_actions_batch", &(AGGREGATOR.to_owned() + "/snapshots"), &body) { @@ -489,7 +490,7 @@ fn map_http_err(err: HttpError) -> &'static str { #[derive(Serialize, Deserialize)] pub struct JSONRPCResponse { jsonrpc: serde_json::Value, - result: Vec, + pub(crate) result: Vec, id: u64, } From 67b8f2055ead6027c054bcf206a2710365131431 Mon Sep 17 00:00:00 2001 From: Gautham Date: Tue, 1 Aug 2023 17:02:10 +0300 Subject: [PATCH 03/13] Increase spec version --- runtime/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index dd1304782..7917d1898 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 295, + spec_version: 296, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 2, From 2a19ffb646b99d8eec4db1ce7c20909741788d56 Mon Sep 17 00:00:00 2001 From: Gautham Date: Tue, 1 Aug 2023 18:41:11 +0300 Subject: [PATCH 04/13] re-introduce Sequential block import check --- pallets/ocex/src/validator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pallets/ocex/src/validator.rs b/pallets/ocex/src/validator.rs index f195c374b..6421d2df0 100644 --- a/pallets/ocex/src/validator.rs +++ b/pallets/ocex/src/validator.rs @@ -198,7 +198,7 @@ impl Pallet { ) -> Result<(), &'static str> { log::debug!(target:"ocex","Importing block: {:?}",blk); - if blk <= state_info.last_block.saturated_into() { + if blk != state_info.last_block.saturating_add(1) { return Err("BlockOutofSequence") } From a65828ea7a5e84203129bc9799697dc7ad10634d Mon Sep 17 00:00:00 2001 From: Gautham Date: Tue, 1 Aug 2023 18:44:29 +0300 Subject: [PATCH 05/13] re-introduce Sequential block import check --- pallets/ocex/src/validator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pallets/ocex/src/validator.rs b/pallets/ocex/src/validator.rs index 6421d2df0..3918a7d32 100644 --- a/pallets/ocex/src/validator.rs +++ b/pallets/ocex/src/validator.rs @@ -198,7 +198,7 @@ impl Pallet { ) -> Result<(), &'static str> { log::debug!(target:"ocex","Importing block: {:?}",blk); - if blk != state_info.last_block.saturating_add(1) { + if blk != state_info.last_block.saturating_add(1).into() { return Err("BlockOutofSequence") } From 834fae6b53412a9a614dc8ab4ad4b861355f3b7c Mon Sep 17 00:00:00 2001 From: Gautham Date: Tue, 1 Aug 2023 19:03:46 +0300 Subject: [PATCH 06/13] starting block --- pallets/ocex/src/snapshot.rs | 8 +++++++- pallets/ocex/src/validator.rs | 6 +++++- runtime/src/lib.rs | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pallets/ocex/src/snapshot.rs b/pallets/ocex/src/snapshot.rs index fb59ff6f5..38eb1b661 100644 --- a/pallets/ocex/src/snapshot.rs +++ b/pallets/ocex/src/snapshot.rs @@ -20,7 +20,7 @@ use parity_scale_codec::{Decode, Encode}; use polkadex_primitives::BlockNumber; // Accounts storage -#[derive(Encode, Decode, PartialEq, Debug, Clone, Default)] +#[derive(Encode, Decode, PartialEq, Debug, Clone)] pub struct StateInfo { /// Last block processed pub last_block: BlockNumber, @@ -31,3 +31,9 @@ pub struct StateInfo { /// Last processed snapshot id pub snapshot_id: u64, } + +impl Default for StateInfo { + fn default() -> Self { + Self { last_block: 4768083, worker_nonce: 0, stid: 0, snapshot_id: 0 } + } +} diff --git a/pallets/ocex/src/validator.rs b/pallets/ocex/src/validator.rs index 3918a7d32..460e72719 100644 --- a/pallets/ocex/src/validator.rs +++ b/pallets/ocex/src/validator.rs @@ -104,8 +104,12 @@ impl Pallet { log::info!(target:"ocex","last_processed_nonce: {:?}, next_nonce: {:?}",last_processed_nonce, next_nonce); if next_nonce.saturating_sub(last_processed_nonce) > 2 { - // We need to sync our offchain state + if state_info.last_block == 0 { + state_info.last_block = 4768083; // This is hard coded as the starting point + } + // We need to sync our off chain state for nonce in last_processed_nonce.saturating_add(1)..next_nonce { + log::info!(target:"ocex","Syncing batch: {:?}",nonce); // Load the next ObMessages let batch = match get_user_action_batch::(nonce) { None => { diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 7917d1898..eda61dc77 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 296, + spec_version: 297, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 2, From 3a3915d22e42635869de38891552721dcf3f5065 Mon Sep 17 00:00:00 2001 From: Gautham Date: Tue, 1 Aug 2023 22:18:52 +0300 Subject: [PATCH 07/13] Update worker status handling --- pallets/ocex/src/lib.rs | 13 +++++++++++-- pallets/ocex/src/validator.rs | 12 ++++++------ runtime/src/lib.rs | 2 +- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pallets/ocex/src/lib.rs b/pallets/ocex/src/lib.rs index c664f0022..993e785ca 100644 --- a/pallets/ocex/src/lib.rs +++ b/pallets/ocex/src/lib.rs @@ -376,8 +376,17 @@ pub mod pallet { fn offchain_worker(block_number: T::BlockNumber) { log::debug!(target:"ocex", "offchain worker started"); - if let Err(err) = Self::run_on_chain_validation(block_number) { - log::error!(target:"ocex","OCEX worker error: {}",err) + + match Self::run_on_chain_validation(block_number) { + Ok(exit_flag) => { + // If exit flag is false, then another worker is online + if !exit_flag { + return + } + }, + Err(err) => { + log::error!(target:"ocex","OCEX worker error: {}",err); + }, } // Set worker status to false let s_info = StorageValueRef::persistent(&WORKER_STATUS); diff --git a/pallets/ocex/src/validator.rs b/pallets/ocex/src/validator.rs index 460e72719..614acedcb 100644 --- a/pallets/ocex/src/validator.rs +++ b/pallets/ocex/src/validator.rs @@ -45,7 +45,7 @@ pub const AGGREGATOR: &str = "https://ob.aggregator.polkadex.trade"; impl Pallet { /// Runs the offchain worker computes the next batch of user actions and /// submits snapshot summary to aggregator endpoint - pub fn run_on_chain_validation(_block_num: T::BlockNumber) -> Result<(), &'static str> { + pub fn run_on_chain_validation(_block_num: T::BlockNumber) -> Result { let local_keys = T::AuthorityId::all(); let authorities = Self::validator_set().validators; let mut available_keys = authorities @@ -74,7 +74,7 @@ impl Pallet { Some(true) => { // Another worker is online, so exit log::info!(target:"ocex", "Another worker is online, so exit"); - return Ok(()) + return Ok(false) }, None => {}, Some(false) => {}, @@ -98,7 +98,7 @@ impl Pallet { log::debug!(target:"ocex","Submitting last processed snapshot: {:?}",next_nonce); // resubmit the summary to aggregator load_signed_summary_and_send::(next_nonce); - return Ok(()) + return Ok(true) } log::info!(target:"ocex","last_processed_nonce: {:?}, next_nonce: {:?}",last_processed_nonce, next_nonce); @@ -114,7 +114,7 @@ impl Pallet { let batch = match get_user_action_batch::(nonce) { None => { log::error!(target:"ocex","No user actions found for nonce: {:?}",nonce); - return Ok(()) + return Ok(true) }, Some(batch) => batch, }; @@ -136,7 +136,7 @@ impl Pallet { state.commit(); store_trie_root(*state.root()); log::debug!(target:"ocex","Stored state root: {:?}",state.root()); - return Ok(()) + return Ok(true) }, Some(batch) => batch, }; @@ -192,7 +192,7 @@ impl Pallet { } } - Ok(()) + Ok(true) } fn import_blk( diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index eda61dc77..af970f322 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 297, + spec_version: 298, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 2, From 4359552523bf14988a6df3a7702c30ebb86453f6 Mon Sep 17 00:00:00 2001 From: Gautham Date: Wed, 2 Aug 2023 11:35:41 +0300 Subject: [PATCH 08/13] Added a caching layer for Offchain state --- Cargo.lock | 5 +- pallets/ocex/src/settlement.rs | 42 ++++++------ pallets/ocex/src/storage.rs | 104 +++++++++++++++++++++++----- pallets/ocex/src/tests.rs | 120 +++++++++++++++++++++++---------- pallets/ocex/src/validator.rs | 50 ++++++-------- 5 files changed, 215 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ce2aa6dd..f88b80325 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9982,11 +9982,12 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.2" +version = "1.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" dependencies = [ "autocfg", + "backtrace", "bytes", "libc", "mio", diff --git a/pallets/ocex/src/settlement.rs b/pallets/ocex/src/settlement.rs index b550f0840..78f59bb8e 100644 --- a/pallets/ocex/src/settlement.rs +++ b/pallets/ocex/src/settlement.rs @@ -18,17 +18,14 @@ //! Helper functions for updating the balance -use crate::validator::map_trie_error; +use crate::storage::OffchainState; use log::{error, info}; use orderbook_primitives::types::Trade; use parity_scale_codec::{alloc::string::ToString, Decode, Encode}; use polkadex_primitives::{ocex::TradingPairConfig, AccountId, AssetId}; use rust_decimal::{prelude::ToPrimitive, Decimal}; use sp_core::crypto::ByteArray; -use sp_runtime::traits::BlakeTwo256; use sp_std::collections::btree_map::BTreeMap; -use sp_trie::LayoutV1; -use trie_db::{TrieDBMut, TrieMut}; /// Updates provided trie db with a new balance entry if it is does not contain item for specific /// account or asset yet, or increments existing item balance. @@ -40,25 +37,24 @@ use trie_db::{TrieDBMut, TrieMut}; /// * `asset`: Asset to look for /// * `balance`: Amount on which balance should be added. pub fn add_balance( - state: &mut TrieDBMut>, + state: &mut OffchainState, account: &AccountId, asset: AssetId, balance: Decimal, ) -> Result<(), &'static str> { - log::info!(target:"ocex", "adding {:?} asset {:?} from account {:?}", balance.to_f64().unwrap(), asset.to_string(), account.as_slice()); - let mut balances: BTreeMap = - match state.get(account.as_slice()).map_err(map_trie_error)? { - None => BTreeMap::new(), - Some(encoded) => BTreeMap::decode(&mut &encoded[..]) - .map_err(|_| "Unable to decode balances for account")?, - }; + log::info!(target:"ocex", "adding {:?} asset {:?} from account {:?}", balance.to_f64().unwrap(), asset.to_string(), account); + let mut balances: BTreeMap = match state.get(&account.to_raw_vec())? { + None => BTreeMap::new(), + Some(encoded) => BTreeMap::decode(&mut &encoded[..]) + .map_err(|_| "Unable to decode balances for account")?, + }; balances .entry(asset) .and_modify(|total| *total = total.saturating_add(balance)) .or_insert(balance); - state.insert(account.as_slice(), &balances.encode()).map_err(map_trie_error)?; + state.insert(account.to_raw_vec(), balances.encode()); Ok(()) } @@ -74,27 +70,27 @@ pub fn add_balance( /// * `asset`: Asset to look for /// * `balance`: Amount on which balance should be reduced. pub fn sub_balance( - state: &mut TrieDBMut>, + state: &mut OffchainState, account: &AccountId, asset: AssetId, balance: Decimal, ) -> Result<(), &'static str> { - log::info!(target:"ocex", "subtracting {:?} asset {:?} from account {:?}", balance.to_f64().unwrap(), asset.to_string(), account.as_slice()); - let mut balances: BTreeMap = - match state.get(account.as_slice()).map_err(map_trie_error)? { - None => return Err("Account not found in trie"), - Some(encoded) => BTreeMap::decode(&mut &encoded[..]) - .map_err(|_| "Unable to decode balances for account")?, - }; + log::info!(target:"ocex", "subtracting {:?} asset {:?} from account {:?}", balance.to_f64().unwrap(), asset.to_string(), account); + let mut balances: BTreeMap = match state.get(&account.to_raw_vec())? { + None => return Err("Account not found in trie"), + Some(encoded) => BTreeMap::decode(&mut &encoded[..]) + .map_err(|_| "Unable to decode balances for account")?, + }; let account_balance = balances.get_mut(&asset).ok_or("NotEnoughBalance")?; if *account_balance < balance { + log::error!(target:"ocex","Asset found but balance low for asset: {:?}, of account: {:?}",asset, account); return Err("NotEnoughBalance") } *account_balance = account_balance.saturating_sub(balance); - state.insert(account.as_slice(), &balances.encode()).map_err(map_trie_error)?; + state.insert(account.to_raw_vec(), balances.encode()); Ok(()) } @@ -112,7 +108,7 @@ pub fn sub_balance( /// /// A `Result<(), Error>` indicating whether the trade was successfully processed or not. pub fn process_trade( - state: &mut TrieDBMut>, + state: &mut OffchainState, trade: &Trade, config: TradingPairConfig, ) -> Result<(), &'static str> { diff --git a/pallets/ocex/src/storage.rs b/pallets/ocex/src/storage.rs index 4bb55ac2d..83499453b 100644 --- a/pallets/ocex/src/storage.rs +++ b/pallets/ocex/src/storage.rs @@ -1,9 +1,10 @@ +use crate::validator::map_trie_error; use hash_db::{AsHashDB, HashDB, Prefix}; use sp_core::{Hasher, H256}; use sp_runtime::{offchain::storage::StorageValueRef, sp_std, traits::BlakeTwo256}; use sp_std::vec::Vec; use sp_trie::{trie_types::TrieDBMutBuilderV1, LayoutV1}; -use trie_db::{DBValue, TrieDBMut}; +use trie_db::{DBValue, TrieDBMut, TrieMut}; pub struct State; @@ -12,6 +13,54 @@ const NULL_NODE_DATA: [u8; 29] = *b"offchain-ocex::null_node_data"; const KEY_PREFIX: [u8; 15] = *b"offchain-ocex::"; const TRIE_ROOT: [u8; 24] = *b"offchain-ocex::trie_root"; +pub struct OffchainState<'a> { + cache: sp_std::collections::btree_map::BTreeMap, Vec>, + trie: TrieDBMut<'a, LayoutV1>, +} + +impl<'a> OffchainState<'a> { + pub fn load(storage: &'a mut State, root: &'a mut H256) -> Self { + let trie = crate::storage::get_state_trie(storage, root); + Self { cache: Default::default(), trie } + } + + pub fn is_empty(&self) -> bool { + self.cache.is_empty() && self.trie.is_empty() + } + + pub fn get(&mut self, key: &Vec) -> Result>, &'static str> { + match self.cache.get(key) { + Some(value) => Ok(Some(value.clone())), + None => match self.trie.get(key) { + Err(err) => { + log::error!(target:"ocex","Trie returned an error while get operation"); + Err(map_trie_error(err)) + }, + Ok(option) => match option { + None => Ok(None), + Some(value) => { + self.cache.insert(key.clone(), value.clone()); + Ok(Some(value)) + }, + }, + }, + } + } + + pub fn insert(&mut self, key: Vec, value: Vec) { + self.cache.insert(key, value); + } + + pub fn commit(&mut self) -> Result { + for (key, value) in self.cache.iter() { + self.trie.insert(key, value).map_err(map_trie_error)?; + } + + self.trie.commit(); + Ok(*self.trie.root()) + } +} + impl State { fn hashed_null_node(&self) -> ::Out { let s_r = StorageValueRef::persistent(&HASHED_NULL_NODE); @@ -43,8 +92,9 @@ impl State { } } - fn db_get(&self, key: &::Out) -> Option<(DBValue, i32)> { - let derive_key = self.derive_storage_key(*key); + fn db_get(&self, key: &Vec) -> Option<(DBValue, i32)> { + log::trace!(target:"ocex","Getting key: {:?}", key); + let derive_key = self.derive_storage_key(key); let s_ref = StorageValueRef::persistent(derive_key.as_slice()); match s_ref.get::<(DBValue, i32)>() { Ok(d) => d, @@ -52,15 +102,17 @@ impl State { } } - fn db_insert(&self, key: ::Out, value: (DBValue, i32)) { - let derive_key = self.derive_storage_key(key); + fn db_insert(&self, key: Vec, value: (DBValue, i32)) { + let derive_key = self.derive_storage_key(&key); + log::trace!(target:"ocex","Inserting key: {:?}, derived: {:?}, value: {:?}", key, derive_key, value); let s_ref = StorageValueRef::persistent(derive_key.as_slice()); s_ref.set(&value); } - fn derive_storage_key(&self, key: ::Out) -> Vec { + fn derive_storage_key(&self, key: &[u8]) -> Vec { let mut derived = KEY_PREFIX.to_vec(); - derived.append(&mut key.0.to_vec()); + let mut cloned_key = key.to_owned(); + derived.append(&mut cloned_key); derived } } @@ -76,26 +128,31 @@ impl AsHashDB for State { } impl HashDB for State { - fn get(&self, key: &::Out, _: Prefix) -> Option { + fn get(&self, key: &::Out, prefix: Prefix) -> Option { + log::trace!(target:"ocex","HashDb get, key: {:?}, prefix: {:?}", key,prefix); if key == &self.hashed_null_node() { return Some(self.null_node_data()) } - match self.db_get(key) { + let key = prefixed_key(key, prefix); + match self.db_get(&key) { Some((ref d, rc)) if rc > 0 => Some(d.clone()), _ => None, } } - fn contains(&self, key: &::Out, _: Prefix) -> bool { + fn contains(&self, key: &::Out, prefix: Prefix) -> bool { + log::trace!(target:"ocex","HashDb contains, key: {:?}, prefix: {:?}", key,prefix); if key == &self.hashed_null_node() { return true } - matches!(self.db_get(key), Some((_, x)) if x > 0) + let key = prefixed_key(key, prefix); + matches!(self.db_get(&key), Some((_, x)) if x > 0) } fn insert(&mut self, prefix: Prefix, value: &[u8]) -> ::Out { + log::trace!(target:"ocex","HashDb insert, prefix: {:?}",prefix); if *value == self.null_node_data() { return self.hashed_null_node() } @@ -104,11 +161,13 @@ impl HashDB for State { key } - fn emplace(&mut self, key: ::Out, _: Prefix, value: DBValue) { + fn emplace(&mut self, key: ::Out, prefix: Prefix, value: DBValue) { + log::trace!(target:"ocex","HashDb emplace, key: {:?}, prefix: {:?}", key,prefix); if value == self.null_node_data() { return } + let key = prefixed_key(&key, prefix); match self.db_get(&key) { Some((mut old_value, mut rc)) => { if rc <= 0 { @@ -123,24 +182,37 @@ impl HashDB for State { } } - fn remove(&mut self, key: &::Out, _: Prefix) { + fn remove(&mut self, key: &::Out, prefix: Prefix) { + log::trace!(target:"ocex","HashDb remove, key: {:?}, prefix: {:?}", key,prefix); if key == &self.hashed_null_node() { return } - match self.db_get(key) { + let key = prefixed_key(key, prefix); + match self.db_get(&key) { Some((value, mut rc)) => { rc -= 1; - self.db_insert(*key, (value, rc)); + self.db_insert(key, (value, rc)); }, None => { let value = DBValue::default(); - self.db_insert(*key, (value, -1)); + self.db_insert(key, (value, -1)); }, } } } +/// Derive a database key from hash value of the node (key) and the node prefix. +pub fn prefixed_key(key: &::Out, prefix: Prefix) -> Vec { + let mut prefixed_key = Vec::with_capacity(key.as_ref().len() + prefix.0.len() + 1); + prefixed_key.extend_from_slice(prefix.0); + if let Some(last) = prefix.1 { + prefixed_key.push(last); + } + prefixed_key.extend_from_slice(key.as_ref()); + prefixed_key +} + pub(crate) fn load_trie_root() -> ::Out { let root_ref = StorageValueRef::persistent(&TRIE_ROOT); match root_ref.get::<::Out>() { diff --git a/pallets/ocex/src/tests.rs b/pallets/ocex/src/tests.rs index d5bf2c569..b8c618662 100644 --- a/pallets/ocex/src/tests.rs +++ b/pallets/ocex/src/tests.rs @@ -122,14 +122,14 @@ fn test_add_balance_new_account() { let amount = 1000000; let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); let result = add_balance(&mut state, &account_id, asset_id, amount.into()); assert_eq!(result, Ok(())); - let encoded = state.get(account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&account_id.to_raw_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!(account_info.get(&asset_id).unwrap(), &amount.into()); // test get_balance() - state.commit(); + state.commit().unwrap(); drop(state); store_trie_root(root); let from_fn = OCEX::get_balance(account_id.clone(), asset_id).unwrap(); @@ -166,10 +166,10 @@ fn test_add_balance_existing_account_with_balance() { let amount = 1000000; let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); let result = add_balance(&mut state, &account_id, asset_id, amount.into()); assert_eq!(result, Ok(())); - let encoded = state.get(account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&account_id.to_raw_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!(account_info.get(&asset_id).unwrap(), &amount.into()); @@ -177,11 +177,11 @@ fn test_add_balance_existing_account_with_balance() { let amount2 = 2000000; let result = add_balance(&mut state, &account_id, asset_id, amount2.into()); assert_eq!(result, Ok(())); - let encoded = state.get(account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&account_id.to_raw_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!(account_info.get(&asset_id).unwrap(), &(amount + amount2).into()); // test get_balance() - state.commit(); + state.commit().unwrap(); drop(state); store_trie_root(root); let from_fn = OCEX::get_balance(account_id.clone(), asset_id).unwrap(); @@ -216,6 +216,52 @@ fn test_add_balance_existing_account_with_balance() { }); } +#[test] +fn test_two_assets() { + let mut ext = new_test_ext(); + ext.persist_offchain_overlay(); + register_offchain_ext(&mut ext); + ext.execute_with(|| { + let account_bytes = [1u8; 32]; + let pablo_main = AccountId::from(account_bytes); + + let account_bytes = [2u8; 32]; + let coinalpha = AccountId::from(account_bytes); + + let account_id = pablo_main.clone(); + let asset1 = AssetId::Asset(123); + let amount1 = Decimal::from_str("0.05").unwrap(); + + let asset2 = AssetId::Asset(456); + let amount2 = Decimal::from_str("0.1").unwrap(); + let mut root = crate::storage::load_trie_root(); + let mut trie_state = crate::storage::State; + let mut state = OffchainState::load(&mut trie_state, &mut root); + add_balance(&mut state, &account_id, asset1, amount1.into()).unwrap(); + add_balance(&mut state, &account_id, asset2, amount2.into()).unwrap(); + let asset123 = AssetId::Asset(123); + let amount123 = Decimal::from_str("25.0").unwrap(); + + let asset456 = AssetId::Asset(456); + let amount456 = Decimal::from_str("10.0").unwrap(); + // works + sub_balance(&mut state, &account_id, asset1, Decimal::from_str("0.01").unwrap().into()) + .unwrap(); + add_balance(&mut state, &coinalpha, asset123, amount123.into()).unwrap(); + add_balance(&mut state, &coinalpha, asset456, amount456.into()).unwrap(); + let root = state.commit().unwrap(); + store_trie_root(root); + drop(state); + let mut root = crate::storage::load_trie_root(); + let mut trie_state = crate::storage::State; + let mut state = OffchainState::load(&mut trie_state, &mut root); + sub_balance(&mut state, &account_id, asset1, Decimal::from_str("0.01").unwrap().into()) + .unwrap(); + sub_balance(&mut state, &account_id, asset1, Decimal::from_str("0.01").unwrap().into()) + .unwrap(); + }); +} + #[test] // check if balance can be subtracted from a new account fn test_sub_balance_new_account() { @@ -228,7 +274,7 @@ fn test_sub_balance_new_account() { let amount = 1000000; let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); let result = sub_balance(&mut state, &account_id, asset_id, amount.into()); match result { Ok(_) => assert!(false), @@ -254,10 +300,10 @@ fn test_sub_balance_existing_account_with_balance() { let amount = 3000000; let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); let result = add_balance(&mut state, &account_id, asset_id, amount.into()); assert_eq!(result, Ok(())); - let encoded = state.get(account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&account_id.to_raw_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!(account_info.get(&asset_id).unwrap(), &amount.into()); @@ -265,7 +311,7 @@ fn test_sub_balance_existing_account_with_balance() { let amount2 = 2000000; let result = sub_balance(&mut state, &account_id, asset_id, amount2.into()); assert_eq!(result, Ok(())); - let encoded = state.get(account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&account_id.to_raw_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!(account_info.get(&asset_id).unwrap(), &(amount - amount2).into()); @@ -273,12 +319,12 @@ fn test_sub_balance_existing_account_with_balance() { let amount3 = amount - amount2; let result = sub_balance(&mut state, &account_id, asset_id, amount3.into()); assert_eq!(result, Ok(())); - let encoded = state.get(account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&account_id.to_raw_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!(amount - amount2 - amount3, 0); assert_eq!(account_info.get(&asset_id).unwrap(), &Decimal::from(0)); // test get_balance() - state.commit(); + state.commit().unwrap(); drop(state); store_trie_root(root); let from_fn = OCEX::get_balance(account_id.clone(), asset_id).unwrap(); @@ -306,24 +352,24 @@ fn test_trie_update() { ext.execute_with(|| { let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); assert!(state.is_empty()); - state.insert(b"a", b"1").unwrap(); - state.insert(b"b", b"2").unwrap(); - state.insert(b"c", b"3").unwrap(); + state.insert(b"a".to_vec(), b"1".to_vec()); + state.insert(b"b".to_vec(), b"2".to_vec()); + state.insert(b"c".to_vec(), b"3".to_vec()); assert!(!state.is_empty()); - let root = state.root(); // This should flush everything to db. - crate::storage::store_trie_root(*root); + let root = state.commit().unwrap(); // This should flush everything to db. + crate::storage::store_trie_root(root); let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); - assert_eq!(state.get(b"a").unwrap().unwrap(), b"1"); - assert_eq!(state.get(b"b").unwrap().unwrap(), b"2"); - assert_eq!(state.get(b"c").unwrap().unwrap(), b"3"); + assert_eq!(state.get(&b"a".to_vec()).unwrap().unwrap(), b"1"); + assert_eq!(state.get(&b"b".to_vec()).unwrap().unwrap(), b"2"); + assert_eq!(state.get(&b"c".to_vec()).unwrap().unwrap(), b"3"); - state.insert(b"d", b"4").unwrap(); // This will not be in DB, as neither root() or commit() is called + state.insert(b"d".to_vec(), b"4".to_vec()); // This will not be in DB, as neither root() or commit() is called let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; @@ -346,7 +392,7 @@ fn test_balance_update_depost_first_then_trade() { let amount = 20; let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); let result = add_balance( &mut state, @@ -380,10 +426,10 @@ fn test_sub_more_than_available_balance_from_existing_account_with_balance() { let amount = 3000000; let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); let result = add_balance(&mut state, &account_id, asset_id, amount.into()); assert_eq!(result, Ok(())); - let encoded = state.get(account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&account_id.to_raw_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!(account_info.get(&asset_id).unwrap(), &amount.into()); @@ -406,7 +452,7 @@ fn test_trade_between_two_accounts_without_balance() { ext.execute_with(|| { let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); let config = get_trading_pair_config(); let amount = Decimal::from_str("20").unwrap(); let price = Decimal::from_str("2").unwrap(); @@ -428,7 +474,7 @@ fn test_trade_between_two_accounts_with_balance() { ext.execute_with(|| { let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); // add balance to alice let alice_account_id = get_alice_key_pair().public(); @@ -466,12 +512,12 @@ fn test_trade_between_two_accounts_with_balance() { assert_ok!(result); //check has 20 pdex now - let encoded = state.get(alice_account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&alice_account_id.0.to_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!(account_info.get(&AssetId::Polkadex).unwrap(), &20.into()); //check if bob has 20 less pdex - let encoded = state.get(bob_account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&bob_account_id.0.to_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!( account_info.get(&AssetId::Polkadex).unwrap(), @@ -479,7 +525,7 @@ fn test_trade_between_two_accounts_with_balance() { ); //check if bob has 40 more asset_1 - let encoded = state.get(bob_account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&bob_account_id.0.to_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!( account_info.get(&AssetId::Asset(1)).unwrap(), @@ -487,7 +533,7 @@ fn test_trade_between_two_accounts_with_balance() { ); //check if alice has 40 less asset_1 - let encoded = state.get(alice_account_id.as_slice()).unwrap().unwrap(); + let encoded = state.get(&alice_account_id.0.to_vec()).unwrap().unwrap(); let account_info: BTreeMap = BTreeMap::decode(&mut &encoded[..]).unwrap(); assert_eq!( account_info.get(&AssetId::Asset(1)).unwrap(), @@ -505,7 +551,7 @@ fn test_trade_between_two_accounts_insuffient_bidder_balance() { ext.execute_with(|| { let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); // add balance to alice let alice_account_id = get_alice_key_pair().public(); @@ -539,7 +585,7 @@ fn test_trade_between_two_accounts_insuffient_asker_balance() { ext.execute_with(|| { let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); // add balance to alice let alice_account_id = get_alice_key_pair().public(); @@ -573,7 +619,7 @@ fn test_trade_between_two_accounts_invalid_signature() { ext.execute_with(|| { let mut root = crate::storage::load_trie_root(); let mut trie_state = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut trie_state, &mut root); + let mut state = OffchainState::load(&mut trie_state, &mut root); // add balance to alice let alice_account_id = get_alice_key_pair().public(); @@ -2255,6 +2301,7 @@ use orderbook_primitives::{ Fees, }; use sp_runtime::traits::{BlockNumberProvider, One}; + use trie_db::TrieMut; #[test] @@ -2303,6 +2350,7 @@ pub fn test_allowlist_with_limit_reaching_returns_error() { use crate::{ settlement::{add_balance, process_trade, sub_balance}, sr25519::AuthorityId, + storage::OffchainState, }; use polkadex_primitives::ingress::{HandleBalance, HandleBalanceLimit}; diff --git a/pallets/ocex/src/validator.rs b/pallets/ocex/src/validator.rs index 614acedcb..902e9c7f0 100644 --- a/pallets/ocex/src/validator.rs +++ b/pallets/ocex/src/validator.rs @@ -25,11 +25,10 @@ use sp_runtime::{ http::{Error, PendingRequest, Response}, storage::StorageValueRef, }, - traits::BlakeTwo256, SaturatedConversion, }; use sp_std::{boxed::Box, collections::btree_map::BTreeMap, vec::Vec}; -use sp_trie::{LayoutV1, TrieDBMut}; + use trie_db::{TrieError, TrieMut}; /// Key of the storage that stores the status of an offchain worker @@ -87,9 +86,9 @@ impl Pallet { let mut root = crate::storage::load_trie_root(); log::info!(target:"ocex","block: {:?}, state_root {:?}", _block_num, root); let mut storage = crate::storage::State; - let mut state = crate::storage::get_state_trie(&mut storage, &mut root); + let mut state = OffchainState::load(&mut storage, &mut root); - let mut state_info = Self::load_state_info(&state); + let mut state_info = Self::load_state_info(&mut state); let last_processed_nonce = state_info.snapshot_id; @@ -132,10 +131,10 @@ impl Pallet { // Store the last processed nonce // We need to -1 from next_nonce, as it is not yet processed state_info.snapshot_id = next_nonce.saturating_sub(1); - Self::store_state_info(state_info, &mut state)?; - state.commit(); - store_trie_root(*state.root()); - log::debug!(target:"ocex","Stored state root: {:?}",state.root()); + Self::store_state_info(state_info, &mut state); + let root = state.commit()?; + store_trie_root(root); + log::debug!(target:"ocex","Stored state root: {:?}",root); return Ok(true) }, Some(batch) => batch, @@ -147,11 +146,10 @@ impl Pallet { // Create state hash and store it state_info.stid = batch.stid; state_info.snapshot_id = batch.snapshot_id; // Store the processed nonce - Self::store_state_info(state_info.clone(), &mut state)?; - state.commit(); - let state_hash: H256 = *state.root(); + Self::store_state_info(state_info.clone(), &mut state); + let state_hash: H256 = state.commit()?; store_trie_root(state_hash); - log::info!(target:"ocex","updated trie root: {:?}", state.root()); + log::info!(target:"ocex","updated trie root: {:?}", state_hash); if sp_io::offchain::is_validator() { match available_keys.get(0) { @@ -197,7 +195,7 @@ impl Pallet { fn import_blk( blk: T::BlockNumber, - state: &mut TrieDBMut>, + state: &mut OffchainState, state_info: &mut StateInfo, ) -> Result<(), &'static str> { log::debug!(target:"ocex","Importing block: {:?}",blk); @@ -226,10 +224,7 @@ impl Pallet { Ok(()) } - fn trades( - trades: &Vec, - state: &mut TrieDBMut>, - ) -> Result<(), &'static str> { + fn trades(trades: &Vec, state: &mut OffchainState) -> Result<(), &'static str> { log::info!(target:"ocex","Settling trades..."); for trade in trades { let config = Self::trading_pairs(trade.maker.pair.base, trade.maker.pair.quote) @@ -242,7 +237,7 @@ impl Pallet { fn withdraw( request: &WithdrawalRequest, - state: &mut TrieDBMut>, + state: &mut OffchainState, stid: u64, ) -> Result, &'static str> { log::info!(target:"ocex","Settling withdraw request..."); @@ -271,7 +266,7 @@ impl Pallet { } fn process_batch( - state: &mut TrieDBMut>, + state: &mut OffchainState, batch: &UserActionBatch, state_info: &mut StateInfo, ) -> Result>, &'static str> { @@ -297,20 +292,16 @@ impl Pallet { Ok(withdrawals) } - fn load_state_info(state: &TrieDBMut>) -> StateInfo { - match state.get(&STATE_INFO) { + fn load_state_info(state: &mut OffchainState) -> StateInfo { + match state.get(&STATE_INFO.to_vec()) { Ok(Some(data)) => StateInfo::decode(&mut &data[..]).unwrap_or_default(), Ok(None) => StateInfo::default(), Err(_) => StateInfo::default(), } } - fn store_state_info( - state_info: StateInfo, - state: &mut TrieDBMut>, - ) -> Result<(), &'static str> { - let _ = state.insert(&STATE_INFO, &state_info.encode()).map_err(map_trie_error)?; - Ok(()) + fn store_state_info(state_info: StateInfo, state: &mut OffchainState) { + state.insert(STATE_INFO.to_vec(), state_info.encode()); } fn calculate_signer_index( @@ -345,11 +336,12 @@ impl Pallet { pub(crate) fn get_state_info() -> StateInfo { let mut root = crate::storage::load_trie_root(); let mut storage = crate::storage::State; - let state = crate::storage::get_state_trie(&mut storage, &mut root); - Self::load_state_info(&state) + let mut state = OffchainState::load(&mut storage, &mut root); + Self::load_state_info(&mut state) } } +use crate::storage::OffchainState; use parity_scale_codec::alloc::string::ToString; use sp_std::borrow::ToOwned; From d2a6f8b95195fb0ec8f74f854cc9004364767a2a Mon Sep 17 00:00:00 2001 From: Gautham Date: Wed, 2 Aug 2023 11:37:53 +0300 Subject: [PATCH 09/13] Increase spec version --- runtime/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index af970f322..34ac31648 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 298, + spec_version: 299, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 2, From bc4b4e327fac3eeda19aa4ee6db561701568cd5c Mon Sep 17 00:00:00 2001 From: Gautham Date: Wed, 2 Aug 2023 12:55:37 +0300 Subject: [PATCH 10/13] Reset DB/Trie on trie errors --- pallets/ocex/src/lib.rs | 2 +- pallets/ocex/src/storage.rs | 2 +- pallets/ocex/src/validator.rs | 20 +++++++++++++------- runtime/src/lib.rs | 2 +- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/pallets/ocex/src/lib.rs b/pallets/ocex/src/lib.rs index 993e785ca..04378f187 100644 --- a/pallets/ocex/src/lib.rs +++ b/pallets/ocex/src/lib.rs @@ -1280,7 +1280,7 @@ pub mod pallet { account_ids.insert(main, proxies); } - let state_info = Self::get_state_info(); + let state_info = Self::get_state_info().map_err(|_err| DispatchError::Corruption)?; let last_processed_block_number = state_info.last_block; let worker_nonce = state_info.worker_nonce; let snapshot_id = state_info.snapshot_id; diff --git a/pallets/ocex/src/storage.rs b/pallets/ocex/src/storage.rs index 83499453b..352b6a1c6 100644 --- a/pallets/ocex/src/storage.rs +++ b/pallets/ocex/src/storage.rs @@ -2,7 +2,7 @@ use crate::validator::map_trie_error; use hash_db::{AsHashDB, HashDB, Prefix}; use sp_core::{Hasher, H256}; use sp_runtime::{offchain::storage::StorageValueRef, sp_std, traits::BlakeTwo256}; -use sp_std::vec::Vec; +use sp_std::{prelude::ToOwned, vec::Vec}; use sp_trie::{trie_types::TrieDBMutBuilderV1, LayoutV1}; use trie_db::{DBValue, TrieDBMut, TrieMut}; diff --git a/pallets/ocex/src/validator.rs b/pallets/ocex/src/validator.rs index 902e9c7f0..ed35914f2 100644 --- a/pallets/ocex/src/validator.rs +++ b/pallets/ocex/src/validator.rs @@ -88,7 +88,14 @@ impl Pallet { let mut storage = crate::storage::State; let mut state = OffchainState::load(&mut storage, &mut root); - let mut state_info = Self::load_state_info(&mut state); + let mut state_info = match Self::load_state_info(&mut state) { + Ok(info) => info, + Err(err) => { + log::error!(target:"ocex","Err loading state info from storage: {:?}",err); + store_trie_root(H256::zero()); + return Err(err) + }, + }; let last_processed_nonce = state_info.snapshot_id; @@ -292,11 +299,10 @@ impl Pallet { Ok(withdrawals) } - fn load_state_info(state: &mut OffchainState) -> StateInfo { - match state.get(&STATE_INFO.to_vec()) { - Ok(Some(data)) => StateInfo::decode(&mut &data[..]).unwrap_or_default(), - Ok(None) => StateInfo::default(), - Err(_) => StateInfo::default(), + fn load_state_info(state: &mut OffchainState) -> Result { + match state.get(&STATE_INFO.to_vec())? { + Some(data) => Ok(StateInfo::decode(&mut &data[..]).unwrap_or_default()), + None => Ok(StateInfo::default()), } } @@ -333,7 +339,7 @@ impl Pallet { Ok(balance) } - pub(crate) fn get_state_info() -> StateInfo { + pub(crate) fn get_state_info() -> Result { let mut root = crate::storage::load_trie_root(); let mut storage = crate::storage::State; let mut state = OffchainState::load(&mut storage, &mut root); diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 34ac31648..f89b34762 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 299, + spec_version: 300, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 2, From df6c289bb004d2279ddcdc464c33e2a39c7bc7d1 Mon Sep 17 00:00:00 2001 From: Gautham Date: Thu, 3 Aug 2023 20:55:06 +0300 Subject: [PATCH 11/13] Disable on_idle --- pallets/ocex/src/lib.rs | 80 +++++++++++++++++++++++++---------------- runtime/src/lib.rs | 2 +- 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/pallets/ocex/src/lib.rs b/pallets/ocex/src/lib.rs index 04378f187..bc5b2cb0e 100644 --- a/pallets/ocex/src/lib.rs +++ b/pallets/ocex/src/lib.rs @@ -328,37 +328,55 @@ pub mod pallet { impl Hooks> for Pallet { /// On idle, use the remaining weight to withdraw finalization /// Automated (but much delayed) `claim_withdraw()` extrinsic - fn on_idle(_n: BlockNumberFor, mut remaining_weight: Weight) -> Weight { - let snapshot_id = >::get(); - while remaining_weight.ref_time() > - ::WeightInfo::claim_withdraw(1).ref_time() - { - >::mutate(snapshot_id, |btree_map| { - // Get mutable reference to the withdrawals vector - if let Some(account) = btree_map.clone().keys().nth(1) { - let mut accounts_to_clean = vec![]; - if let Some(withdrawal_vector) = btree_map.get_mut(account) { - if let Some(withdrawal) = withdrawal_vector.pop() { - if !Self::on_idle_withdrawal_processor(withdrawal.clone()) { - withdrawal_vector.push(withdrawal.clone()); - Self::deposit_event(Event::WithdrawalFailed(withdrawal)); - } - } else { - // this user has no withdrawals left - remove from map - accounts_to_clean.push(account.clone()); - } - } - for user in accounts_to_clean { - btree_map.remove(&user); - } - } - // we drain weight ALWAYS - remaining_weight = remaining_weight - .saturating_sub(::WeightInfo::claim_withdraw(1)); - }); - } - remaining_weight - } + // fn on_idle(_n: BlockNumberFor, mut remaining_weight: Weight) -> Weight { + // let snapshot_id = >::get(); + // while remaining_weight.ref_time() > + // ::WeightInfo::claim_withdraw(1).ref_time() + // { + // >::mutate(snapshot_id, |btree_map| { + // // Get mutable reference to the withdrawals vector + // if let Some(account) = btree_map.clone().keys().nth(1) { + // let mut accounts_to_clean = vec![]; + // if let Some(withdrawal_vector) = btree_map.get_mut(account) { + // if let Some(withdrawal) = withdrawal_vector.pop() { + // if !Self::on_idle_withdrawal_processor(withdrawal.clone()) { + // withdrawal_vector.push(withdrawal.clone()); + // Self::deposit_event(Event::WithdrawalFailed(withdrawal)); + // }else{ + // // TODO: enable this only after testing + // // Update events on successful withdrawal + // // let processed_withdrawls = + // // // Deposit event about successful withdraw + // // Self::deposit_event(Event::WithdrawalClaimed { + // // main: account.clone(), + // // withdrawals: processed_withdrawals.clone(), + // // }); + // // >::mutate(|onchain_events| { + // // onchain_events.push( + // // polkadex_primitives::ocex::OnChainEvents::OrderBookWithdrawalClaimed( + // // snapshot_id, + // // account.clone(), + // // processed_withdrawals, + // // ), + // // ); + // // }); + // } + // } else { + // // this user has no withdrawals left - remove from map + // accounts_to_clean.push(account.clone()); + // } + // } + // for user in accounts_to_clean { + // btree_map.remove(&user); + // } + // } + // // we drain weight ALWAYS + // remaining_weight = remaining_weight + // .saturating_sub(::WeightInfo::claim_withdraw(1)); + // }); + // } + // remaining_weight + // } /// What to do at the end of each block. /// /// Clean OnCHainEvents diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index f89b34762..ae45522dd 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 300, + spec_version: 301, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 2, From 47126b09c1cd1a0c8d09fb6eb5e339706484f562 Mon Sep 17 00:00:00 2001 From: Gautham Date: Thu, 3 Aug 2023 21:37:11 +0300 Subject: [PATCH 12/13] Implement URL fallback for Thea --- pallets/thea/src/validation.rs | 94 ++++++++++++++++++++++++++-------- primitives/thea/src/types.rs | 1 + runtime/src/lib.rs | 2 +- 3 files changed, 76 insertions(+), 21 deletions(-) diff --git a/pallets/thea/src/validation.rs b/pallets/thea/src/validation.rs index 063dd0d17..d0bde64c2 100644 --- a/pallets/thea/src/validation.rs +++ b/pallets/thea/src/validation.rs @@ -121,15 +121,19 @@ fn submit_message_to_aggregator( log::error!(target:"thea","Error serializing approved message: {:?}",err); "Error serializing approved message" })?; - send_request("thea_aggregator_link", AGGREGRATOR_URL, body.as_str())?; + send_request("thea_aggregator_link", Destination::Aggregator, body.as_str())?; Ok(()) } fn get_latest_incoming_nonce_parachain() -> u64 { let storage_key = create_para_incoming_nonce_key(); - get_storage_at_latest_finalized_head::("para_incoming_nonce", PARACHAIN_URL, storage_key) - .unwrap_or_default() - .unwrap_or_default() + get_storage_at_latest_finalized_head::( + "para_incoming_nonce", + Destination::Parachain, + storage_key, + ) + .unwrap_or_default() + .unwrap_or_default() } fn get_payload_for_nonce( @@ -142,22 +146,36 @@ fn get_payload_for_nonce( Destination::Solochain => { // Get the outgoing message with nonce: `nonce` for network: `network` let key = create_solo_outgoing_message_key(nonce, network); - get_storage_at_latest_finalized_head::( + match get_storage_at_latest_finalized_head::( "solo_outgoing_message", - MAINNET_URL, + destination, key, - ) - .unwrap() + ) { + Ok(message) => message, + Err(err) => { + log::error!(target:"thea","Unable to get finalized solo head: {:?}",err); + None + }, + } }, Destination::Parachain => { // Get the outgoing message with nonce: `nonce` from network let key = create_para_outgoing_message_key(nonce); - get_storage_at_latest_finalized_head::( + match get_storage_at_latest_finalized_head::( "para_outgoing_message", - PARACHAIN_URL, + destination, key, - ) - .unwrap() + ) { + Ok(message) => message, + Err(err) => { + log::error!(target:"thea","Unable to get finalized solo head: {:?}",err); + None + }, + } + }, + _ => { + log::warn!(target:"thea","Invalid destination provided"); + None }, } } @@ -195,12 +213,12 @@ pub fn create_para_outgoing_message_key(nonce: u64) -> Vec { fn get_storage_at_latest_finalized_head( log_target: &str, - url: &str, + destination: Destination, storage_key: Vec, ) -> Result, &'static str> { log::debug!(target:"thea","getting storage for {}",log_target); // 1. Get finalized head ( Fh ) - let finalized_head = get_finalized_head(url)?; + let finalized_head = get_finalized_head(destination)?; let storage_key = "0x".to_owned() + &hex::encode(storage_key); @@ -213,7 +231,7 @@ fn get_storage_at_latest_finalized_head( }) .to_string(); - let storage_bytes = send_request(log_target, url, body.as_str())?; + let storage_bytes = send_request(log_target, destination, body.as_str())?; if storage_bytes.is_null() { log::debug!(target:"thea","Storage query returned null response"); @@ -228,7 +246,8 @@ fn get_storage_at_latest_finalized_head( Ok(Some(Decode::decode(&mut &storage_bytes[..]).map_err(|_| "Decode failure")?)) } use scale_info::prelude::string::String; -fn get_finalized_head(url: &str) -> Result { + +fn get_finalized_head(destination: Destination) -> Result { // This body will work for most substrate chains let body = serde_json::json!({ "id":1, @@ -237,16 +256,51 @@ fn get_finalized_head(url: &str) -> Result { "params": [] }); let mut result = - send_request("get_finalized_head", url, body.to_string().as_str())?.to_string(); + send_request("get_finalized_head", destination, body.to_string().as_str())?.to_string(); result = result.replace('\"', ""); log::debug!(target:"thea","Finalized head: {:?}",result); Ok(result) } +pub fn resolve_destination_url(destination: Destination, counter: i32) -> String { + if destination == Destination::Aggregator { + return AGGREGRATOR_URL.to_string() + } + let url = match (destination, counter) { + (Destination::Solochain, 0) => "http://localhost:9944", + (Destination::Solochain, 1) => MAINNET_URL, + (Destination::Parachain, 0) => "http://localhost:8844", + (Destination::Parachain, 1) => PARACHAIN_URL, + _ => AGGREGRATOR_URL, + }; + log::debug!(target:"thea","Resolving {:?}: {:?} to {:?}",destination,counter,url); + url.to_string() +} + pub fn send_request( log_target: &str, - url: &str, + destination: Destination, body: &str, +) -> Result { + for try_counter in 0..2 { + match create_and_send_request( + log_target, + body, + &resolve_destination_url(destination, try_counter), + ) { + Ok(value) => return Ok(value), + Err(err) => { + log::error!(target:"thea","Error querying {:?}: {:?}",log_target, err); + }, + } + } + return Err("request failed") +} + +fn create_and_send_request( + log_target: &str, + body: &str, + url: &str, ) -> Result { let deadline = sp_io::offchain::timestamp().add(Duration::from_millis(12_000)); @@ -270,7 +324,7 @@ pub fn send_request( if response.code != 200u16 { log::warn!(target:"thea","Unexpected status code for {}: {:?}",log_target,response.code); - return Err("request failed") + return Err("Unexpected status code") } let body = response.body().collect::>(); @@ -283,7 +337,7 @@ pub fn send_request( log::debug!(target:"thea","{} response: {:?}",log_target,body_str); let response: JSONRPCResponse = serde_json::from_str::(body_str) .map_err(|_| "Response failed deserialize")?; - Ok(response.result) + return Ok(response.result) } fn map_sp_runtime_http_err(err: sp_runtime::offchain::http::Error) -> &'static str { diff --git a/primitives/thea/src/types.rs b/primitives/thea/src/types.rs index b5d3f0316..5069a342c 100644 --- a/primitives/thea/src/types.rs +++ b/primitives/thea/src/types.rs @@ -67,6 +67,7 @@ pub struct Message { pub enum Destination { Solochain, Parachain, + Aggregator, } /// Defines structure of the deposit. diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index ae45522dd..7f76a0174 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 301, + spec_version: 302, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 2, From 2ffee941929d8ac3e6ecf138dddc63d767655bb6 Mon Sep 17 00:00:00 2001 From: Gautham Date: Fri, 4 Aug 2023 11:43:03 +0300 Subject: [PATCH 13/13] Add docs --- pallets/thea/src/validation.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pallets/thea/src/validation.rs b/pallets/thea/src/validation.rs index d0bde64c2..fca6d76fe 100644 --- a/pallets/thea/src/validation.rs +++ b/pallets/thea/src/validation.rs @@ -262,6 +262,10 @@ fn get_finalized_head(destination: Destination) -> Result Ok(result) } +/// Returns the url based on destination and counter +/// +/// Counter value 0 means return the local address +/// Counter value 1 means returns the fallback address. pub fn resolve_destination_url(destination: Destination, counter: i32) -> String { if destination == Destination::Aggregator { return AGGREGRATOR_URL.to_string()