Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gj/fix race condition #858

Merged
merged 13 commits into from
Aug 4, 2023
13 changes: 11 additions & 2 deletions pallets/ocex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,17 @@ pub mod pallet {

fn offchain_worker(block_number: T::BlockNumber) {
log::debug!(target:"ocex", "offchain worker started");
if let Err(err) = Self::run_on_chain_validation(block_number) {
log::error!(target:"ocex","OCEX worker error: {}",err)

match Self::run_on_chain_validation(block_number) {
Ok(exit_flag) => {
// If exit flag is false, then another worker is online
if !exit_flag {
return
}
},
Err(err) => {
log::error!(target:"ocex","OCEX worker error: {}",err);
},
}
// Set worker status to false
let s_info = StorageValueRef::persistent(&WORKER_STATUS);
Expand Down
8 changes: 7 additions & 1 deletion pallets/ocex/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use parity_scale_codec::{Decode, Encode};
use polkadex_primitives::BlockNumber;

// Accounts storage
#[derive(Encode, Decode, PartialEq, Debug, Clone, Default)]
#[derive(Encode, Decode, PartialEq, Debug, Clone)]
pub struct StateInfo {
/// Last block processed
pub last_block: BlockNumber,
Expand All @@ -31,3 +31,9 @@ pub struct StateInfo {
/// Last processed snapshot id
pub snapshot_id: u64,
}

impl Default for StateInfo {
fn default() -> Self {
Self { last_block: 4768083, worker_nonce: 0, stid: 0, snapshot_id: 0 }
}
}
55 changes: 31 additions & 24 deletions pallets/ocex/src/validator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
pallet::{Accounts, ValidatorSetId},
pallet::ValidatorSetId,
settlement::{add_balance, process_trade, sub_balance},
snapshot::StateInfo,
storage::store_trie_root,
Expand Down Expand Up @@ -45,7 +45,7 @@ pub const AGGREGATOR: &str = "https://ob.aggregator.polkadex.trade";
impl<T: Config> Pallet<T> {
/// Runs the offchain worker computes the next batch of user actions and
/// submits snapshot summary to aggregator endpoint
pub fn run_on_chain_validation(_block_num: T::BlockNumber) -> Result<(), &'static str> {
pub fn run_on_chain_validation(_block_num: T::BlockNumber) -> Result<bool, &'static str> {
let local_keys = T::AuthorityId::all();
let authorities = Self::validator_set().validators;
let mut available_keys = authorities
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<T: Config> Pallet<T> {
Some(true) => {
// Another worker is online, so exit
log::info!(target:"ocex", "Another worker is online, so exit");
return Ok(())
return Ok(false)
},
None => {},
Some(false) => {},
Expand All @@ -98,19 +98,23 @@ impl<T: Config> Pallet<T> {
log::debug!(target:"ocex","Submitting last processed snapshot: {:?}",next_nonce);
// resubmit the summary to aggregator
load_signed_summary_and_send::<T>(next_nonce);
return Ok(())
return Ok(true)
}

log::info!(target:"ocex","last_processed_nonce: {:?}, next_nonce: {:?}",last_processed_nonce, next_nonce);

if next_nonce.saturating_sub(last_processed_nonce) > 2 {
// We need to sync our offchain state
if state_info.last_block == 0 {
state_info.last_block = 4768083; // This is hard coded as the starting point
}
// We need to sync our off chain state
for nonce in last_processed_nonce.saturating_add(1)..next_nonce {
log::info!(target:"ocex","Syncing batch: {:?}",nonce);
// Load the next ObMessages
let batch = match get_user_action_batch::<T>(nonce) {
None => {
log::error!(target:"ocex","No user actions found for nonce: {:?}",nonce);
return Ok(())
return Ok(true)
},
Some(batch) => batch,
};
Expand All @@ -132,23 +136,24 @@ impl<T: Config> Pallet<T> {
state.commit();
store_trie_root(*state.root());
log::debug!(target:"ocex","Stored state root: {:?}",state.root());
return Ok(())
return Ok(true)
},
Some(batch) => batch,
};

log::info!(target:"ocex","Processing user actions for nonce: {:?}",next_nonce);
let withdrawals = Self::process_batch(&mut state, &batch, &mut state_info)?;

if sp_io::offchain::is_validator() {
// Create state hash.
state_info.snapshot_id = batch.snapshot_id; // Store the processed nonce
Self::store_state_info(state_info.clone(), &mut state)?;
state.commit();
let state_hash: H256 = *state.root();
store_trie_root(state_hash);
log::info!(target:"ocex","updated trie root: {:?}", state.root());
// Create state hash and store it
state_info.stid = batch.stid;
state_info.snapshot_id = batch.snapshot_id; // Store the processed nonce
Self::store_state_info(state_info.clone(), &mut state)?;
state.commit();
let state_hash: H256 = *state.root();
store_trie_root(state_hash);
log::info!(target:"ocex","updated trie root: {:?}", state.root());

if sp_io::offchain::is_validator() {
match available_keys.get(0) {
None => return Err("No active keys found"),
Some(key) => {
Expand Down Expand Up @@ -187,7 +192,7 @@ impl<T: Config> Pallet<T> {
}
}

Ok(())
Ok(true)
}

fn import_blk(
Expand All @@ -197,7 +202,7 @@ impl<T: Config> Pallet<T> {
) -> Result<(), &'static str> {
log::debug!(target:"ocex","Importing block: {:?}",blk);

if blk <= state_info.last_block.saturated_into() {
if blk != state_info.last_block.saturating_add(1).into() {
return Err("BlockOutofSequence")
}

Expand Down Expand Up @@ -242,12 +247,14 @@ impl<T: Config> Pallet<T> {
) -> Result<Withdrawal<T::AccountId>, &'static str> {
log::info!(target:"ocex","Settling withdraw request...");
let amount = request.amount().map_err(|_| "decimal conversion error")?;
let account_info = <Accounts<T>>::get(&request.main).ok_or("Main account not found")?;
// FIXME: Don't remove these comments, will be reintroduced after fixing the race condition
// let account_info = <Accounts<T>>::get(&request.main).ok_or("Main account not found")?;

// if !account_info.proxies.contains(&request.proxy) {
// // TODO: Check Race condition
// return Err("Proxy not found")
// }

if !account_info.proxies.contains(&request.proxy) {
// TODO: Check Race condition
return Err("Proxy not found")
}
if !request.verify() {
return Err("SignatureVerificationFailed")
}
Expand Down Expand Up @@ -346,7 +353,7 @@ impl<T: Config> Pallet<T> {
use parity_scale_codec::alloc::string::ToString;
use sp_std::borrow::ToOwned;

fn get_user_action_batch<T: Config>(id: u64) -> Option<UserActionBatch<T::AccountId>> {
pub fn get_user_action_batch<T: Config>(id: u64) -> Option<UserActionBatch<T::AccountId>> {
let body = serde_json::json!({ "id": id }).to_string();
35359595 marked this conversation as resolved.
Show resolved Hide resolved
let result =
match send_request("user_actions_batch", &(AGGREGATOR.to_owned() + "/snapshots"), &body) {
Expand Down Expand Up @@ -487,7 +494,7 @@ fn map_http_err(err: HttpError) -> &'static str {
#[derive(Serialize, Deserialize)]
pub struct JSONRPCResponse {
jsonrpc: serde_json::Value,
result: Vec<u8>,
pub(crate) result: Vec<u8>,
id: u64,
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
// and set impl_version to 0. If only runtime
// implementation changes and behavior does not, then leave spec_version as
// is and increment impl_version.
spec_version: 294,
spec_version: 298,
impl_version: 0,
apis: RUNTIME_API_VERSIONS,
transaction_version: 2,
Expand Down
Loading