Skip to content

Commit

Permalink
Added a caching layer for Offchain state
Browse files Browse the repository at this point in the history
  • Loading branch information
Gauthamastro committed Aug 2, 2023
1 parent 3a3915d commit 4359552
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 106 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 19 additions & 23 deletions pallets/ocex/src/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@

//! Helper functions for updating the balance

use crate::validator::map_trie_error;
use crate::storage::OffchainState;
use log::{error, info};
use orderbook_primitives::types::Trade;
use parity_scale_codec::{alloc::string::ToString, Decode, Encode};
use polkadex_primitives::{ocex::TradingPairConfig, AccountId, AssetId};
use rust_decimal::{prelude::ToPrimitive, Decimal};
use sp_core::crypto::ByteArray;
use sp_runtime::traits::BlakeTwo256;
use sp_std::collections::btree_map::BTreeMap;
use sp_trie::LayoutV1;
use trie_db::{TrieDBMut, TrieMut};

/// Updates provided trie db with a new balance entry if it is does not contain item for specific
/// account or asset yet, or increments existing item balance.
Expand All @@ -40,25 +37,24 @@ use trie_db::{TrieDBMut, TrieMut};
/// * `asset`: Asset to look for
/// * `balance`: Amount on which balance should be added.
pub fn add_balance(
state: &mut TrieDBMut<LayoutV1<BlakeTwo256>>,
state: &mut OffchainState,
account: &AccountId,
asset: AssetId,
balance: Decimal,
) -> Result<(), &'static str> {
log::info!(target:"ocex", "adding {:?} asset {:?} from account {:?}", balance.to_f64().unwrap(), asset.to_string(), account.as_slice());
let mut balances: BTreeMap<AssetId, Decimal> =
match state.get(account.as_slice()).map_err(map_trie_error)? {
None => BTreeMap::new(),
Some(encoded) => BTreeMap::decode(&mut &encoded[..])
.map_err(|_| "Unable to decode balances for account")?,
};
log::info!(target:"ocex", "adding {:?} asset {:?} from account {:?}", balance.to_f64().unwrap(), asset.to_string(), account);
let mut balances: BTreeMap<AssetId, Decimal> = match state.get(&account.to_raw_vec())? {
None => BTreeMap::new(),
Some(encoded) => BTreeMap::decode(&mut &encoded[..])
.map_err(|_| "Unable to decode balances for account")?,
};

balances
.entry(asset)
.and_modify(|total| *total = total.saturating_add(balance))
.or_insert(balance);

state.insert(account.as_slice(), &balances.encode()).map_err(map_trie_error)?;
state.insert(account.to_raw_vec(), balances.encode());
Ok(())
}

Expand All @@ -74,27 +70,27 @@ pub fn add_balance(
/// * `asset`: Asset to look for
/// * `balance`: Amount on which balance should be reduced.
pub fn sub_balance(
state: &mut TrieDBMut<LayoutV1<BlakeTwo256>>,
state: &mut OffchainState,
account: &AccountId,
asset: AssetId,
balance: Decimal,
) -> Result<(), &'static str> {
log::info!(target:"ocex", "subtracting {:?} asset {:?} from account {:?}", balance.to_f64().unwrap(), asset.to_string(), account.as_slice());
let mut balances: BTreeMap<AssetId, Decimal> =
match state.get(account.as_slice()).map_err(map_trie_error)? {
None => return Err("Account not found in trie"),
Some(encoded) => BTreeMap::decode(&mut &encoded[..])
.map_err(|_| "Unable to decode balances for account")?,
};
log::info!(target:"ocex", "subtracting {:?} asset {:?} from account {:?}", balance.to_f64().unwrap(), asset.to_string(), account);
let mut balances: BTreeMap<AssetId, Decimal> = match state.get(&account.to_raw_vec())? {
None => return Err("Account not found in trie"),
Some(encoded) => BTreeMap::decode(&mut &encoded[..])
.map_err(|_| "Unable to decode balances for account")?,
};

let account_balance = balances.get_mut(&asset).ok_or("NotEnoughBalance")?;

if *account_balance < balance {
log::error!(target:"ocex","Asset found but balance low for asset: {:?}, of account: {:?}",asset, account);
return Err("NotEnoughBalance")
}
*account_balance = account_balance.saturating_sub(balance);

state.insert(account.as_slice(), &balances.encode()).map_err(map_trie_error)?;
state.insert(account.to_raw_vec(), balances.encode());

Ok(())
}
Expand All @@ -112,7 +108,7 @@ pub fn sub_balance(
///
/// A `Result<(), Error>` indicating whether the trade was successfully processed or not.
pub fn process_trade(
state: &mut TrieDBMut<LayoutV1<BlakeTwo256>>,
state: &mut OffchainState,
trade: &Trade,
config: TradingPairConfig,
) -> Result<(), &'static str> {
Expand Down
104 changes: 88 additions & 16 deletions pallets/ocex/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::validator::map_trie_error;
use hash_db::{AsHashDB, HashDB, Prefix};
use sp_core::{Hasher, H256};
use sp_runtime::{offchain::storage::StorageValueRef, sp_std, traits::BlakeTwo256};
use sp_std::vec::Vec;
use sp_trie::{trie_types::TrieDBMutBuilderV1, LayoutV1};
use trie_db::{DBValue, TrieDBMut};
use trie_db::{DBValue, TrieDBMut, TrieMut};

pub struct State;

Expand All @@ -12,6 +13,54 @@ const NULL_NODE_DATA: [u8; 29] = *b"offchain-ocex::null_node_data";
const KEY_PREFIX: [u8; 15] = *b"offchain-ocex::";
const TRIE_ROOT: [u8; 24] = *b"offchain-ocex::trie_root";

pub struct OffchainState<'a> {
cache: sp_std::collections::btree_map::BTreeMap<Vec<u8>, Vec<u8>>,
trie: TrieDBMut<'a, LayoutV1<BlakeTwo256>>,
}

impl<'a> OffchainState<'a> {
pub fn load(storage: &'a mut State, root: &'a mut H256) -> Self {
let trie = crate::storage::get_state_trie(storage, root);
Self { cache: Default::default(), trie }
}

pub fn is_empty(&self) -> bool {
self.cache.is_empty() && self.trie.is_empty()
}

pub fn get(&mut self, key: &Vec<u8>) -> Result<Option<Vec<u8>>, &'static str> {
match self.cache.get(key) {
Some(value) => Ok(Some(value.clone())),
None => match self.trie.get(key) {
Err(err) => {
log::error!(target:"ocex","Trie returned an error while get operation");
Err(map_trie_error(err))
},
Ok(option) => match option {
None => Ok(None),
Some(value) => {
self.cache.insert(key.clone(), value.clone());
Ok(Some(value))
},
},
},
}
}

pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
self.cache.insert(key, value);
}

pub fn commit(&mut self) -> Result<H256, &'static str> {
for (key, value) in self.cache.iter() {
self.trie.insert(key, value).map_err(map_trie_error)?;
}

self.trie.commit();
Ok(*self.trie.root())
}
}

impl State {
fn hashed_null_node(&self) -> <BlakeTwo256 as Hasher>::Out {
let s_r = StorageValueRef::persistent(&HASHED_NULL_NODE);
Expand Down Expand Up @@ -43,24 +92,27 @@ impl State {
}
}

fn db_get(&self, key: &<BlakeTwo256 as Hasher>::Out) -> Option<(DBValue, i32)> {
let derive_key = self.derive_storage_key(*key);
fn db_get(&self, key: &Vec<u8>) -> Option<(DBValue, i32)> {
log::trace!(target:"ocex","Getting key: {:?}", key);
let derive_key = self.derive_storage_key(key);
let s_ref = StorageValueRef::persistent(derive_key.as_slice());
match s_ref.get::<(DBValue, i32)>() {
Ok(d) => d,
Err(_) => None,
}
}

fn db_insert(&self, key: <BlakeTwo256 as Hasher>::Out, value: (DBValue, i32)) {
let derive_key = self.derive_storage_key(key);
fn db_insert(&self, key: Vec<u8>, value: (DBValue, i32)) {
let derive_key = self.derive_storage_key(&key);
log::trace!(target:"ocex","Inserting key: {:?}, derived: {:?}, value: {:?}", key, derive_key, value);
let s_ref = StorageValueRef::persistent(derive_key.as_slice());
s_ref.set(&value);
}

fn derive_storage_key(&self, key: <BlakeTwo256 as Hasher>::Out) -> Vec<u8> {
fn derive_storage_key(&self, key: &[u8]) -> Vec<u8> {
let mut derived = KEY_PREFIX.to_vec();
derived.append(&mut key.0.to_vec());
let mut cloned_key = key.to_owned();
derived.append(&mut cloned_key);
derived
}
}
Expand All @@ -76,26 +128,31 @@ impl AsHashDB<BlakeTwo256, DBValue> for State {
}

impl HashDB<BlakeTwo256, DBValue> for State {
fn get(&self, key: &<BlakeTwo256 as Hasher>::Out, _: Prefix) -> Option<DBValue> {
fn get(&self, key: &<BlakeTwo256 as Hasher>::Out, prefix: Prefix) -> Option<DBValue> {
log::trace!(target:"ocex","HashDb get, key: {:?}, prefix: {:?}", key,prefix);
if key == &self.hashed_null_node() {
return Some(self.null_node_data())
}

match self.db_get(key) {
let key = prefixed_key(key, prefix);
match self.db_get(&key) {
Some((ref d, rc)) if rc > 0 => Some(d.clone()),
_ => None,
}
}

fn contains(&self, key: &<BlakeTwo256 as Hasher>::Out, _: Prefix) -> bool {
fn contains(&self, key: &<BlakeTwo256 as Hasher>::Out, prefix: Prefix) -> bool {
log::trace!(target:"ocex","HashDb contains, key: {:?}, prefix: {:?}", key,prefix);
if key == &self.hashed_null_node() {
return true
}

matches!(self.db_get(key), Some((_, x)) if x > 0)
let key = prefixed_key(key, prefix);
matches!(self.db_get(&key), Some((_, x)) if x > 0)
}

fn insert(&mut self, prefix: Prefix, value: &[u8]) -> <BlakeTwo256 as Hasher>::Out {
log::trace!(target:"ocex","HashDb insert, prefix: {:?}",prefix);
if *value == self.null_node_data() {
return self.hashed_null_node()
}
Expand All @@ -104,11 +161,13 @@ impl HashDB<BlakeTwo256, DBValue> for State {
key
}

fn emplace(&mut self, key: <BlakeTwo256 as Hasher>::Out, _: Prefix, value: DBValue) {
fn emplace(&mut self, key: <BlakeTwo256 as Hasher>::Out, prefix: Prefix, value: DBValue) {
log::trace!(target:"ocex","HashDb emplace, key: {:?}, prefix: {:?}", key,prefix);
if value == self.null_node_data() {
return
}

let key = prefixed_key(&key, prefix);
match self.db_get(&key) {
Some((mut old_value, mut rc)) => {
if rc <= 0 {
Expand All @@ -123,24 +182,37 @@ impl HashDB<BlakeTwo256, DBValue> for State {
}
}

fn remove(&mut self, key: &<BlakeTwo256 as Hasher>::Out, _: Prefix) {
fn remove(&mut self, key: &<BlakeTwo256 as Hasher>::Out, prefix: Prefix) {
log::trace!(target:"ocex","HashDb remove, key: {:?}, prefix: {:?}", key,prefix);
if key == &self.hashed_null_node() {
return
}

match self.db_get(key) {
let key = prefixed_key(key, prefix);
match self.db_get(&key) {
Some((value, mut rc)) => {
rc -= 1;
self.db_insert(*key, (value, rc));
self.db_insert(key, (value, rc));
},
None => {
let value = DBValue::default();
self.db_insert(*key, (value, -1));
self.db_insert(key, (value, -1));
},
}
}
}

/// Derive a database key from hash value of the node (key) and the node prefix.
pub fn prefixed_key(key: &<BlakeTwo256 as Hasher>::Out, prefix: Prefix) -> Vec<u8> {
let mut prefixed_key = Vec::with_capacity(key.as_ref().len() + prefix.0.len() + 1);
prefixed_key.extend_from_slice(prefix.0);
if let Some(last) = prefix.1 {
prefixed_key.push(last);
}
prefixed_key.extend_from_slice(key.as_ref());
prefixed_key
}

pub(crate) fn load_trie_root() -> <BlakeTwo256 as Hasher>::Out {
let root_ref = StorageValueRef::persistent(&TRIE_ROOT);
match root_ref.get::<<BlakeTwo256 as Hasher>::Out>() {
Expand Down
Loading

0 comments on commit 4359552

Please sign in to comment.