diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index e48824c5b2a..21b7b4edfba 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -361,14 +361,9 @@ impl Service { outcomes .into_iter() .map(|outcome| match outcome { - LocalOutcome::CodeApproved(code_id) => Commitment::Code(CodeCommitment { - id: code_id, - valid: true, - }), - LocalOutcome::CodeRejected(code_id) => Commitment::Code(CodeCommitment { - id: code_id, - valid: false, - }), + LocalOutcome::CodeValidated { id, valid } => { + Commitment::Code(CodeCommitment { id, valid }) + } _ => unreachable!("Only code outcomes are expected here"), }) .collect() diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 09bbbee1353..50f45093a1f 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -37,7 +37,7 @@ use ethexe_signer::Signer; use ethexe_validator::Validator; use futures::StreamExt; use gear_core::ids::prelude::*; -use gprimitives::{ActorId, CodeId, MessageId, H256}; +use gprimitives::{ActorId, CodeId, MessageId, H160, H256}; use std::{sync::Arc, time::Duration}; use tokio::{ sync::{ @@ -148,6 +148,7 @@ struct TestEnv { validator_private_key: ethexe_signer::PrivateKey, validator_public_key: ethexe_signer::PublicKey, router_address: ethexe_signer::Address, + sender_address: ActorId, block_time: Duration, running_service_handle: Option>>, } @@ -226,6 +227,7 @@ impl TestEnv { validator_private_key, validator_public_key, router_address, + sender_address: ActorId::from(H160::from(sender_address.0)), block_time, running_service_handle: None, }; @@ -327,6 +329,7 @@ impl Drop for TestEnv { } } } + #[tokio::test(flavor = "multi_thread")] #[ntest::timeout(60_000)] async fn ping() { @@ -414,9 +417,12 @@ async fn ping() { if address == program_id { match event { MirrorEvent::MessageQueueingRequested { - id, payload, value, .. + id, + source, + payload, + value, } => { - // TODO (breathx): assert source. + assert_eq!(source, env.sender_address); assert_eq!(payload, b"PING"); assert_eq!(value, 0); init_message_id = id; @@ -583,9 +589,12 @@ async fn ping_reorg() { if address == program_id { match event { MirrorEvent::MessageQueueingRequested { - id, payload, value, .. + id, + source, + payload, + value, } => { - // TODO (breathx): assert source. + assert_eq!(source, env.sender_address); assert_eq!(payload, b"PING"); assert_eq!(value, 0); init_message_id = id; @@ -829,9 +838,13 @@ async fn ping_deep_sync() { if address == program_id { match event { MirrorEvent::MessageQueueingRequested { - id, payload, value, .. + id, + source, + payload, + value, + .. } => { - // TODO (breathx): assert source. + assert_eq!(source, env.sender_address); assert_eq!(payload, b"PING"); assert_eq!(value, 0); init_message_id = id; diff --git a/ethexe/contracts/script/Deployment.s.sol b/ethexe/contracts/script/Deployment.s.sol index 849c34ec5b7..c373dfdb189 100644 --- a/ethexe/contracts/script/Deployment.s.sol +++ b/ethexe/contracts/script/Deployment.s.sol @@ -46,7 +46,6 @@ contract RouterScript is Script { mirror = new Mirror(); mirrorProxy = new MirrorProxy(address(router)); - // TODO (breathx): remove this approve. wrappedVara.approve(address(router), type(uint256).max); vm.stopBroadcast(); diff --git a/ethexe/contracts/src/IMirror.sol b/ethexe/contracts/src/IMirror.sol index 37c63e34e9d..cb384620d6d 100644 --- a/ethexe/contracts/src/IMirror.sol +++ b/ethexe/contracts/src/IMirror.sol @@ -61,7 +61,6 @@ interface IMirror { event Reply(bytes payload, uint128 value, bytes32 replyTo, bytes4 indexed replyCode); // TODO (breathx): should we deposit it? should we notify about successful reply sending? - // TODO (breathx): `value` could be removed from event. /** * @dev Emitted when a user succeed in claiming value request and receives balance. * diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index f2848a97b65..577df36c58d 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -378,14 +378,23 @@ impl Database { // TODO: consider to change decode panics to Results. impl Storage for Database { fn read_state(&self, hash: H256) -> Option { + if hash.is_zero() { + return Some(ProgramState::zero()); + } + let data = self.cas.read(&hash)?; - Some( - ProgramState::decode(&mut &data[..]) - .expect("Failed to decode data into `ProgramState`"), - ) + + let state = ProgramState::decode(&mut &data[..]) + .expect("Failed to decode data into `ProgramState`"); + + Some(state) } fn write_state(&self, state: ProgramState) -> H256 { + if state.is_zero() { + return H256::zero(); + } + self.cas.write(&state.encode()) } diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 288470fb676..dd24a2b7cef 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -185,7 +185,6 @@ pub(crate) async fn read_block_events( provider: &ObserverProvider, router_address: AlloyAddress, ) -> Result> { - // TODO (breathx): discuss should we check validity of wvara address for router on some block. let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); let wvara_address = router_query.wvara_address().await?; @@ -265,7 +264,6 @@ pub(crate) async fn read_block_events_batch( provider: &ObserverProvider, router_address: AlloyAddress, ) -> Result>> { - // TODO (breathx): discuss should we check validity of wvara address for router on some block. let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); let wvara_address = router_query.wvara_address().await?; diff --git a/ethexe/processor/src/host/threads.rs b/ethexe/processor/src/host/threads.rs index c9bf25ff777..ddfb6e3ec95 100644 --- a/ethexe/processor/src/host/threads.rs +++ b/ethexe/processor/src/host/threads.rs @@ -55,7 +55,7 @@ impl ThreadParams { pub fn pages(&mut self) -> &BTreeMap { self.pages.get_or_insert_with(|| { let ProgramState { - state: Program::Active(ActiveProgram { pages_hash, .. }), + program: Program::Active(ActiveProgram { pages_hash, .. }), .. } = self.db.read_state(self.state_hash).expect(UNKNOWN_STATE) else { diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 2f7ebc20134..141555e7993 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -19,20 +19,17 @@ //! Program's execution service for eGPU. use anyhow::Result; -use core_processor::common::JournalNote; use ethexe_common::{ mirror::Event as MirrorEvent, router::{Event as RouterEvent, StateTransition}, + wvara::Event as WVaraEvent, BlockEvent, }; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; -use ethexe_runtime_common::state::{ - self, ActiveProgram, Dispatch, MaybeHash, ProgramState, Storage, -}; +use ethexe_runtime_common::state::{Dispatch, HashAndLen, MaybeHash, Storage}; use gear_core::{ - ids::{prelude::CodeIdExt, ActorId, MessageId, ProgramId}, + ids::{prelude::CodeIdExt, ProgramId}, message::{DispatchKind, Payload}, - program::MemoryInfix, }; use gprimitives::{CodeId, H256}; use host::InstanceCreator; @@ -45,28 +42,19 @@ mod run; #[cfg(test)] mod tests; -pub struct UserMessage { - id: MessageId, - kind: DispatchKind, - source: ActorId, - payload: Vec, - value: u128, -} - pub struct Processor { db: Database, creator: InstanceCreator, } -// TODO (breathx): rename outcomes accordingly to events. /// Local changes that can be committed to the network or local signer. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum LocalOutcome { - /// Produced when code with specific id is recorded and available in database. - CodeApproved(CodeId), - - // TODO: add docs - CodeRejected(CodeId), + /// Produced when code with specific id is recorded and validated. + CodeValidated { + id: CodeId, + valid: bool, + }, Transition(StateTransition), } @@ -121,8 +109,7 @@ impl Processor { Ok(true) } - // TODO: deal with params on smart contract side. - pub fn handle_new_program(&mut self, program_id: ProgramId, code_id: CodeId) -> Result { + pub fn handle_new_program(&mut self, program_id: ProgramId, code_id: CodeId) -> Result<()> { if self.db.original_code(code_id).is_none() { anyhow::bail!("code existence should be checked on smart contract side"); } @@ -133,26 +120,7 @@ impl Processor { self.db.set_program_code_id(program_id, code_id); - // TODO (breathx): state here is non-zero (?!). - - let active_program = ActiveProgram { - allocations_hash: MaybeHash::Empty, - pages_hash: MaybeHash::Empty, - memory_infix: MemoryInfix::new(0), - initialized: false, - }; - - // TODO: on program creation send message to it. - let program_state = ProgramState { - state: state::Program::Active(active_program), - queue_hash: MaybeHash::Empty, - waitlist_hash: MaybeHash::Empty, - balance: 0, - executable_balance: 10_000_000_000_000, // TODO: remove this minting - }; - - // TODO: not write zero state, but just register it (or support default on get) - Ok(self.db.write_state(program_state)) + Ok(()) } pub fn handle_executable_balance_top_up( @@ -165,89 +133,67 @@ impl Processor { .read_state(state_hash) .ok_or_else(|| anyhow::anyhow!("program should exist"))?; - // TODO (breathx): mutate exec balance after #4067. - state.balance += value; + state.executable_balance += value; Ok(self.db.write_state(state)) } - pub fn handle_user_message( + pub fn handle_payload(&mut self, payload: Vec) -> Result { + let payload = Payload::try_from(payload) + .map_err(|_| anyhow::anyhow!("payload should be checked on eth side"))?; + + let hash = payload + .inner() + .is_empty() + .then_some(MaybeHash::Empty) + .unwrap_or_else(|| self.db.write_payload(payload).into()); + + Ok(hash) + } + + pub fn handle_message_queueing( &mut self, - program_hash: H256, - messages: Vec, + state_hash: H256, + dispatch: Dispatch, ) -> Result { - if messages.is_empty() { - return Ok(program_hash); - } + self.handle_messages_queueing(state_hash, vec![dispatch]) + } - let mut dispatches = Vec::with_capacity(messages.len()); - - for message in messages { - let payload = Payload::try_from(message.payload) - .map_err(|_| anyhow::anyhow!("payload should be checked on eth side"))?; - - let payload_hash = payload - .inner() - .is_empty() - .then_some(MaybeHash::Empty) - .unwrap_or_else(|| self.db.write_payload(payload).into()); - - let dispatch = Dispatch { - id: message.id, - kind: message.kind, - source: message.source, - payload_hash, - value: message.value, - // TODO: handle replies. - details: None, - context: None, - }; - - dispatches.push(dispatch); + pub fn handle_messages_queueing( + &mut self, + state_hash: H256, + dispatches: Vec, + ) -> Result { + if dispatches.is_empty() { + return Ok(state_hash); } - // TODO: on zero hash return default avoiding db. - let mut program_state = self + let mut state = self .db - .read_state(program_hash) + .read_state(state_hash) .ok_or_else(|| anyhow::anyhow!("program should exist"))?; - let mut queue = if let MaybeHash::Hash(queue_hash_and_len) = program_state.queue_hash { - self.db - .read_queue(queue_hash_and_len.hash) - .ok_or_else(|| anyhow::anyhow!("queue should exist if hash present"))? - } else { - VecDeque::with_capacity(dispatches.len()) - }; - - queue.extend(dispatches); - - let queue_hash = self.db.write_queue(queue); + anyhow::ensure!(state.program.is_active(), "program should be active"); - program_state.queue_hash = MaybeHash::Hash(queue_hash.into()); + let queue = if let MaybeHash::Hash(HashAndLen { + hash: queue_hash, .. + }) = state.queue_hash + { + let mut queue = self + .db + .read_queue(queue_hash) + .ok_or_else(|| anyhow::anyhow!("queue should exist if hash present"))?; - Ok(self.db.write_state(program_state)) - } - - pub fn run_on_host( - &mut self, - program_id: ProgramId, - program_state: H256, - ) -> Result> { - let original_code_id = self.db.program_code_id(program_id).unwrap(); + queue.extend(dispatches); - let maybe_instrumented_code = self - .db - .instrumented_code(ethexe_runtime::VERSION, original_code_id); + queue + } else { + VecDeque::from(dispatches) + }; - let mut executor = self.creator.instantiate()?; + state.queue_hash = self.db.write_queue(queue).into(); - executor.run( - program_id, - original_code_id, - program_state, - maybe_instrumented_code, - ) + Ok(self.db.write_state(state)) } // TODO: replace LocalOutcome with Transition struct. @@ -272,11 +218,9 @@ impl Processor { ) -> Result> { log::debug!("Processing upload code {code_id:?}"); - if code_id != CodeId::generate(code) || self.handle_new_code(code)?.is_none() { - Ok(vec![LocalOutcome::CodeRejected(code_id)]) - } else { - Ok(vec![LocalOutcome::CodeApproved(code_id)]) - } + let valid = !(code_id != CodeId::generate(code) || self.handle_new_code(code)?.is_none()); + + Ok(vec![LocalOutcome::CodeValidated { id: code_id, valid }]) } pub fn process_block_events( @@ -285,89 +229,142 @@ impl Processor { // TODO (breathx): accept not ref? events: &[BlockEvent], ) -> Result> { - log::debug!("Processing events for {block_hash:?}: {events:?}"); + log::debug!("Processing events for {block_hash:?}: {events:#?}"); - let mut outcomes = vec![]; - - let initial_program_states = self + let mut states = self .db .block_start_program_states(block_hash) .unwrap_or_default(); - let mut programs = initial_program_states.clone(); - for event in events { match event { - BlockEvent::Router(event) => match event.clone() { - RouterEvent::ProgramCreated { actor_id, code_id } => { - // TODO (breathx): set this zero like start of the block data. - let state_hash = self.handle_new_program(actor_id, code_id)?; - - programs.insert(actor_id, state_hash); - } - _ => { - log::debug!( - "Handling for router event {event:?} is not yet implemented; noop" - ); - continue; - } - }, + BlockEvent::Router(event) => { + self.process_router_event(&mut states, event.clone())?; + } BlockEvent::Mirror { address, event } => { - // TODO (breathx): handle if not (program from another router / incorrect event order ). - let state_hash = *programs.get(address).expect("should exist"); - - let state_hash = match event.clone() { - MirrorEvent::ExecutableBalanceTopUpRequested { value } => { - self.handle_executable_balance_top_up(state_hash, value)? - } - MirrorEvent::MessageQueueingRequested { - id, - source, - payload, - value, - } => { - // TODO (breathx): replace with state_hash.is_zero(); - let kind = if !initial_program_states.contains_key(address) { - DispatchKind::Init - } else { - DispatchKind::Handle - }; - - self.handle_user_message( - state_hash, - vec![UserMessage { - id, - kind, - source, - payload, - // TODO (breathx): mutate exec balance after #4067. - value, - }], - )? - } - _ => { - log::debug!( - "Handling for mirror event {event:?} is not yet implemented; noop" - ); - continue; - } - }; - - programs.insert(*address, state_hash); + self.process_mirror_event(&mut states, *address, event.clone())?; } BlockEvent::WVara(event) => { - log::debug!("Handling for wvara event {event:?} is not yet implemented; noop"); - continue; + self.process_wvara_event(&mut states, event.clone())?; } } } - let current_outcomes = self.run(block_hash, &mut programs)?; - - outcomes.extend(current_outcomes); + let outcomes = self.run(block_hash, &mut states)?; - self.db.set_block_end_program_states(block_hash, programs); + self.db.set_block_end_program_states(block_hash, states); Ok(outcomes) } + + fn process_router_event( + &mut self, + states: &mut BTreeMap, + event: RouterEvent, + ) -> Result<()> { + match event { + RouterEvent::ProgramCreated { actor_id, code_id } => { + self.handle_new_program(actor_id, code_id)?; + + states.insert(actor_id, H256::zero()); + } + RouterEvent::CodeValidationRequested { .. } + | RouterEvent::BaseWeightChanged { .. } + | RouterEvent::StorageSlotChanged + | RouterEvent::ValidatorsSetChanged + | RouterEvent::ValuePerWeightChanged { .. } => { + log::debug!("Handler not yet implemented: {event:?}"); + return Ok(()); + } + RouterEvent::BlockCommitted { .. } | RouterEvent::CodeGotValidated { .. } => { + log::debug!("Informational events are noop for processing: {event:?}"); + return Ok(()); + } + }; + + Ok(()) + } + + fn process_mirror_event( + &mut self, + states: &mut BTreeMap, + actor_id: ProgramId, + event: MirrorEvent, + ) -> Result<()> { + let Some(&state_hash) = states.get(&actor_id) else { + log::debug!("Received event from unrecognized mirror ({actor_id}): {event:?}"); + + return Ok(()); + }; + + let new_state_hash = match event { + MirrorEvent::ExecutableBalanceTopUpRequested { value } => { + self.handle_executable_balance_top_up(state_hash, value)? + } + MirrorEvent::MessageQueueingRequested { + id, + source, + payload, + value, + } => { + let payload_hash = self.handle_payload(payload)?; + + let state = self + .db + .read_state(state_hash) + .ok_or_else(|| anyhow::anyhow!("program should exist"))?; + + let kind = if state.requires_init_message() { + DispatchKind::Init + } else { + DispatchKind::Handle + }; + + let dispatch = Dispatch { + id, + kind, + source, + payload_hash, + value, + details: None, + context: None, + }; + + self.handle_message_queueing(state_hash, dispatch)? + } + MirrorEvent::ReplyQueueingRequested { .. } + | MirrorEvent::ValueClaimingRequested { .. } => { + log::debug!("Handler not yet implemented: {event:?}"); + return Ok(()); + } + MirrorEvent::StateChanged { .. } + | MirrorEvent::ValueClaimed { .. } + | MirrorEvent::Message { .. } + | MirrorEvent::Reply { .. } => { + log::debug!("Informational events are noop for processing: {event:?}"); + return Ok(()); + } + }; + + states.insert(actor_id, new_state_hash); + + Ok(()) + } + + fn process_wvara_event( + &mut self, + _states: &mut BTreeMap, + event: WVaraEvent, + ) -> Result<()> { + match event { + WVaraEvent::Transfer { .. } => { + log::debug!("Handler not yet implemented: {event:?}"); + Ok(()) + } + WVaraEvent::Approval { .. } => { + log::debug!("Informational events are noop for processing: {event:?}"); + Ok(()) + } + } + } } diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index ceff98da13b..b4b25d09ee5 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -71,7 +71,13 @@ fn process_observer_event() { .process_upload_code(code_id, &code) .expect("failed to upload code"); log::debug!("\n\nUpload code outcomes: {outcomes:?}\n\n"); - assert_eq!(outcomes, vec![LocalOutcome::CodeApproved(code_id)]); + assert_eq!( + outcomes, + vec![LocalOutcome::CodeValidated { + id: code_id, + valid: true + }] + ); let ch1 = init_new_block_from_parent(&mut processor, ch0); @@ -79,7 +85,6 @@ fn process_observer_event() { let create_program_events = vec![ BlockEvent::Router(RouterEvent::ProgramCreated { actor_id, code_id }), - // TODO (breathx): think of constant. BlockEvent::mirror( actor_id, MirrorEvent::ExecutableBalanceTopUpRequested { @@ -196,33 +201,6 @@ fn handle_new_code_invalid() { .is_none()); } -#[test] -fn host_ping_pong() { - init_logger(); - - let db = MemDb::default(); - let mut processor = Processor::new(Database::from_one(&db, Default::default())).unwrap(); - - init_new_block(&mut processor, Default::default()); - - let program_id = 42.into(); - - let code_id = processor - .handle_new_code(demo_ping::WASM_BINARY) - .expect("failed to call runtime api") - .expect("code failed verification or instrumentation"); - - let state_hash = processor - .handle_new_program(program_id, code_id) - .expect("failed to create new program"); - - let state_hash = processor - .handle_user_message(state_hash, vec![create_message(DispatchKind::Init, "PING")]) - .expect("failed to populate message queue"); - - let _init = processor.run_on_host(program_id, state_hash).unwrap(); -} - #[test] fn ping_pong() { init_logger(); @@ -240,18 +218,32 @@ fn ping_pong() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let state_hash = processor + processor .handle_new_program(program_id, code_id) .expect("failed to create new program"); let state_hash = processor - .handle_user_message( - state_hash, - vec![ - create_message_full(MessageId::from(1), DispatchKind::Init, user_id, "PING"), - create_message_full(MessageId::from(2), DispatchKind::Handle, user_id, "PING"), - ], - ) + .handle_executable_balance_top_up(H256::zero(), 10_000_000_000) + .expect("failed to top up balance"); + + let messages = vec![ + create_message_full( + &mut processor, + MessageId::from(1), + DispatchKind::Init, + user_id, + "PING", + ), + create_message_full( + &mut processor, + MessageId::from(2), + DispatchKind::Handle, + user_id, + "PING", + ), + ]; + let state_hash = processor + .handle_messages_queueing(state_hash, messages) .expect("failed to populate message queue"); let mut programs = BTreeMap::from_iter([(program_id, state_hash)]); @@ -269,22 +261,38 @@ fn ping_pong() { assert_eq!(message.payload_bytes(), b"PONG"); } -fn create_message(kind: DispatchKind, payload: impl AsRef<[u8]>) -> UserMessage { - create_message_full(H256::random().into(), kind, H256::random().into(), payload) +fn create_message( + processor: &mut Processor, + kind: DispatchKind, + payload: impl AsRef<[u8]>, +) -> Dispatch { + create_message_full( + processor, + H256::random().into(), + kind, + H256::random().into(), + payload, + ) } fn create_message_full( + processor: &mut Processor, id: MessageId, kind: DispatchKind, source: ActorId, payload: impl AsRef<[u8]>, -) -> UserMessage { - UserMessage { +) -> Dispatch { + let payload = payload.as_ref().to_vec(); + let payload_hash = processor.handle_payload(payload).unwrap(); + + Dispatch { id, kind, source, - payload: payload.as_ref().to_vec(), + payload_hash, value: 0, + details: None, + context: None, } } @@ -317,50 +325,50 @@ fn async_and_ping() { .expect("failed to call runtime api") .expect("code failed verification or instrumentation"); - let ping_state_hash = processor + processor .handle_new_program(ping_id, ping_code_id) .expect("failed to create new program"); + + let state_hash = processor + .handle_executable_balance_top_up(H256::zero(), 10_000_000_000) + .expect("failed to top up balance"); + + let message = create_message_full( + &mut processor, + get_next_message_id(), + DispatchKind::Init, + user_id, + "PING", + ); let ping_state_hash = processor - .handle_user_message( - ping_state_hash, - vec![UserMessage { - id: get_next_message_id(), - kind: DispatchKind::Init, - source: user_id, - payload: b"PING".to_vec(), - value: 0, - }], - ) + .handle_message_queueing(state_hash, message) .expect("failed to populate message queue"); - let async_state_hash = processor + processor .handle_new_program(async_id, upload_code_id) .expect("failed to create new program"); + + let message = create_message_full( + &mut processor, + get_next_message_id(), + DispatchKind::Init, + user_id, + ping_id.encode(), + ); let async_state_hash = processor - .handle_user_message( - async_state_hash, - vec![UserMessage { - id: get_next_message_id(), - kind: DispatchKind::Init, - source: user_id, - payload: ping_id.encode(), - value: 0, - }], - ) + .handle_message_queueing(state_hash, message) .expect("failed to populate message queue"); let wait_for_reply_to = get_next_message_id(); + let message = create_message_full( + &mut processor, + wait_for_reply_to, + DispatchKind::Handle, + user_id, + demo_async::Command::Common.encode(), + ); let async_state_hash = processor - .handle_user_message( - async_state_hash, - vec![UserMessage { - id: wait_for_reply_to, - kind: DispatchKind::Handle, - source: user_id, - payload: demo_async::Command::Common.encode(), - value: 0, - }], - ) + .handle_message_queueing(async_state_hash, message) .expect("failed to populate message queue"); let mut programs = @@ -431,11 +439,17 @@ fn many_waits() { for i in 0..amount { let program_id = ProgramId::from(i); - let state_hash = processor + processor .handle_new_program(program_id, code_id) .expect("failed to create new program"); + + let state_hash = processor + .handle_executable_balance_top_up(H256::zero(), 10_000_000_000) + .expect("failed to top up balance"); + + let message = create_message(&mut processor, DispatchKind::Init, b""); let state_hash = processor - .handle_user_message(state_hash, vec![create_message(DispatchKind::Init, b"")]) + .handle_message_queueing(state_hash, message) .expect("failed to populate message queue"); programs.insert(program_id, state_hash); @@ -445,8 +459,9 @@ fn many_waits() { assert_eq!(to_users.len(), amount as usize); for (_pid, state_hash) in programs.iter_mut() { + let message = create_message(&mut processor, DispatchKind::Handle, b""); let new_state_hash = processor - .handle_user_message(*state_hash, vec![create_message(DispatchKind::Handle, b"")]) + .handle_message_queueing(*state_hash, message) .expect("failed to populate message queue"); *state_hash = new_state_hash; } diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index b3342a9509f..56ef8aca17c 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -78,7 +78,7 @@ impl JournalHandler for Handler<'_, S> { DispatchOutcome::Exit { .. } => todo!(), DispatchOutcome::InitSuccess { program_id } => { log::trace!("Dispatch {message_id} init success for program {program_id}"); - self.update_program(program_id, |mut state, _| match &mut state.state { + self.update_program(program_id, |mut state, _| match &mut state.program { state::Program::Active(program) => { program.initialized = true; Some(state) @@ -96,7 +96,7 @@ impl JournalHandler for Handler<'_, S> { ); self.update_program(program_id, |state, _| { Some(ProgramState { - state: state::Program::Terminated(origin), + program: state::Program::Terminated(origin), ..state }) }); @@ -122,7 +122,7 @@ impl JournalHandler for Handler<'_, S> { fn exit_dispatch(&mut self, id_exited: ProgramId, value_destination: ProgramId) { self.update_program(id_exited, |state, _| { Some(ProgramState { - state: state::Program::Exited(value_destination), + program: state::Program::Exited(value_destination), ..state }) }); @@ -325,7 +325,7 @@ impl JournalHandler for Handler<'_, S> { } self.update_program(program_id, |state, storage| { - let state::Program::Active(mut active_state) = state.state else { + let state::Program::Active(mut active_state) = state.program else { return None; }; @@ -343,7 +343,7 @@ impl JournalHandler for Handler<'_, S> { }; Some(ProgramState { - state: state::Program::Active(changed_active_state), + program: state::Program::Active(changed_active_state), ..state }) }); diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 7d38bdd4060..19db601a5af 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -192,7 +192,7 @@ pub fn process_next_message>( outgoing_bytes_limit: 64 * 1024 * 1024, }; - let active_state = match program_state.state { + let active_state = match program_state.program { state::Program::Active(state) => state, state::Program::Exited(program_id) | state::Program::Terminated(program_id) => { log::trace!("Program {program_id} is not active"); diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 00d77aed821..6a88151b283 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -39,7 +39,7 @@ use parity_scale_codec::{Decode, Encode}; pub use gear_core::program::ProgramState as InitStatus; -#[derive(Clone, Debug, Encode, Decode)] +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] pub struct HashAndLen { pub hash: H256, pub len: NonZeroU32, @@ -55,7 +55,7 @@ impl From for HashAndLen { } } -#[derive(Clone, Debug, Encode, Decode)] +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] pub enum MaybeHash { Hash(HashAndLen), Empty, @@ -69,6 +69,10 @@ impl From for MaybeHash { } impl MaybeHash { + pub fn is_empty(&self) -> bool { + matches!(self, MaybeHash::Empty) + } + pub fn with_hash_or_default(&self, f: impl FnOnce(H256) -> T) -> T { match &self { Self::Hash(HashAndLen { hash, .. }) => f(*hash), @@ -77,7 +81,7 @@ impl MaybeHash { } } -#[derive(Clone, Debug, Decode, Encode)] +#[derive(Clone, Debug, Decode, Encode, PartialEq, Eq)] pub struct ActiveProgram { /// Hash of wasm memory pages allocations, see [`Allocations`]. pub allocations_hash: MaybeHash, @@ -89,18 +93,24 @@ pub struct ActiveProgram { pub initialized: bool, } -#[derive(Clone, Debug, Decode, Encode)] +#[derive(Clone, Debug, Decode, Encode, PartialEq, Eq)] pub enum Program { Active(ActiveProgram), Exited(ProgramId), Terminated(ProgramId), } +impl Program { + pub fn is_active(&self) -> bool { + matches!(self, Self::Active(_)) + } +} + /// ethexe program state. -#[derive(Clone, Debug, Decode, Encode)] +#[derive(Clone, Debug, Decode, Encode, PartialEq, Eq)] pub struct ProgramState { /// Active, exited or terminated program state. - pub state: Program, + pub program: Program, /// Hash of incoming message queue, see [`MessageQueue`]. pub queue_hash: MaybeHash, /// Hash of waiting messages list, see [`Waitlist`]. @@ -111,6 +121,41 @@ pub struct ProgramState { pub executable_balance: Value, } +impl ProgramState { + pub const fn zero() -> Self { + Self { + program: Program::Active(ActiveProgram { + allocations_hash: MaybeHash::Empty, + pages_hash: MaybeHash::Empty, + memory_infix: MemoryInfix::new(0), + initialized: false, + }), + queue_hash: MaybeHash::Empty, + waitlist_hash: MaybeHash::Empty, + balance: 0, + executable_balance: 0, + } + } + + pub fn is_zero(&self) -> bool { + *self == Self::zero() + } + + pub fn requires_init_message(&self) -> bool { + if !matches!( + self.program, + Program::Active(ActiveProgram { + initialized: false, + .. + }) + ) { + return false; + } + + self.queue_hash.is_empty() && self.waitlist_hash.is_empty() + } +} + #[derive(Clone, Debug, Encode, Decode)] pub struct Dispatch { /// Message id. diff --git a/ethexe/runtime/src/wasm/storage.rs b/ethexe/runtime/src/wasm/storage.rs index 8180de200f1..0e16b763553 100644 --- a/ethexe/runtime/src/wasm/storage.rs +++ b/ethexe/runtime/src/wasm/storage.rs @@ -53,6 +53,10 @@ impl Storage for RuntimeInterfaceStorage { } fn read_state(&self, hash: H256) -> Option { + if hash.is_zero() { + return Some(ProgramState::zero()); + } + database_ri::read_unwrapping(&hash) } @@ -81,6 +85,10 @@ impl Storage for RuntimeInterfaceStorage { } fn write_state(&self, state: ProgramState) -> H256 { + if state.is_zero() { + return H256::zero(); + } + database_ri::write(state) }