diff --git a/Cargo.lock b/Cargo.lock index 1de88b5aea0..f5eb00f8f00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5276,13 +5276,16 @@ dependencies = [ "anyhow", "ethexe-common", "ethexe-db", + "ethexe-processor", "futures", + "gear-core", "gprimitives", "hex", "hyper 1.4.1", "jsonrpsee 0.24.0", "log", "parity-scale-codec", + "sp-core", "tokio", "tower", ] @@ -6533,6 +6536,7 @@ dependencies = [ "gwasm-instrument", "hashbrown 0.14.5", "hex", + "impl-serde", "log", "num-traits", "numerated", diff --git a/Cargo.toml b/Cargo.toml index a12c7c8740d..e5d6d7ab9fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,6 +143,7 @@ hashbrown = "0.14.5" hex = { version = "0.4.3", default-features = false } hex-literal = "0.4.1" impl-trait-for-tuples = "0.2.2" +impl-serde = "0.4.0" jsonrpsee = { version = "^0.16" } libc = { version = "0.2", default-features = false } log = { version = "0.4.22", default-features = false } diff --git a/core/Cargo.toml b/core/Cargo.toml index cf07180c2ed..d38b4e2a262 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -38,6 +38,7 @@ primitive-types = { workspace = true, features = ["scale-info"] } # Optional dependencies serde = { workspace = true, features = ["derive"], optional = true } +impl-serde = { workspace = true, optional = true } [dev-dependencies] wabt.workspace = true @@ -49,4 +50,4 @@ numerated = { workspace = true, features = ["mock"] } [features] default = [] strict = [] -std = ["serde/std", "wasmparser/std", "gear-core-errors/serde"] +std = ["serde/std", "dep:impl-serde", "wasmparser/std", "gear-core-errors/serde"] diff --git a/core/src/message/mod.rs b/core/src/message/mod.rs index dd1285f8760..52d40f28225 100644 --- a/core/src/message/mod.rs +++ b/core/src/message/mod.rs @@ -239,6 +239,7 @@ pub trait Packet { #[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct ReplyInfo { /// Payload of the reply. + #[cfg_attr(feature = "std", serde(with = "impl_serde::serialize"))] pub payload: Vec, /// Value sent with reply. pub value: u128, diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index f4e3fb6ccd7..669cfcda158 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -322,7 +322,7 @@ impl Service { db.set_block_end_state_is_valid(block_hash, true); let header = db.block_header(block_hash).expect("must be set; qed"); - db.set_latest_valid_block_height(header.height); + db.set_latest_valid_block(block_hash, header); Ok(transition_outcomes) } diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index aad3baa0803..89582e2750f 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -71,8 +71,8 @@ pub trait BlockMetaStorage: Send + Sync { fn block_outcome(&self, block_hash: H256) -> Option>; fn set_block_outcome(&self, block_hash: H256, outcome: Vec); - fn latest_valid_block_height(&self) -> Option; - fn set_latest_valid_block_height(&self, block_height: u32); + fn latest_valid_block(&self) -> Option<(H256, BlockHeader)>; + fn set_latest_valid_block(&self, block_hash: H256, header: BlockHeader); } pub trait CodesStorage: Send + Sync { diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index ff0688f7032..7c5ff42e78a 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -20,7 +20,10 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque}; -use crate::{CASDatabase, KVDatabase}; +use crate::{ + overlay::{CASOverlay, KVOverlay}, + CASDatabase, KVDatabase, +}; use ethexe_common::{ db::{BlockHeader, BlockMetaStorage, CodesStorage}, router::StateTransition, @@ -253,18 +256,19 @@ impl BlockMetaStorage for Database { ); } - fn latest_valid_block_height(&self) -> Option { + fn latest_valid_block(&self) -> Option<(H256, BlockHeader)> { self.kv .get(&KeyPrefix::LatestValidBlock.one(self.router_address)) - .map(|block_height| { - u32::from_le_bytes(block_height.try_into().expect("must be correct; qed")) + .map(|data| { + <(H256, BlockHeader)>::decode(&mut data.as_slice()) + .expect("Failed to decode data into `(H256, BlockHeader)`") }) } - fn set_latest_valid_block_height(&self, block_height: u32) { + fn set_latest_valid_block(&self, block_hash: H256, header: BlockHeader) { self.kv.put( &KeyPrefix::LatestValidBlock.one(self.router_address), - block_height.to_le_bytes().to_vec(), + (block_hash, header).encode(), ); } } @@ -383,6 +387,16 @@ impl Database { } } + /// # Safety + /// Not ready for using in prod. Intended to be for rpc calls only. + pub unsafe fn overlaid(self) -> Self { + Self { + cas: Box::new(CASOverlay::new(self.cas)), + kv: Box::new(KVOverlay::new(self.kv)), + router_address: self.router_address, + } + } + // TODO: temporary solution for MVP runtime-interfaces db access. pub fn read_by_hash(&self, hash: H256) -> Option> { self.cas.read(&hash) diff --git a/ethexe/db/src/lib.rs b/ethexe/db/src/lib.rs index a5da5d0ff87..7a7d93bc44a 100644 --- a/ethexe/db/src/lib.rs +++ b/ethexe/db/src/lib.rs @@ -23,6 +23,7 @@ use gprimitives::H256; mod database; mod mem; +mod overlay; mod rocks; pub use database::Database; diff --git a/ethexe/db/src/overlay.rs b/ethexe/db/src/overlay.rs new file mode 100644 index 00000000000..58c24315aad --- /dev/null +++ b/ethexe/db/src/overlay.rs @@ -0,0 +1,105 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{CASDatabase, KVDatabase, MemDb}; +use gear_core::ids::hash; +use gprimitives::H256; +use std::collections::HashSet; + +pub struct CASOverlay { + db: Box, + mem: MemDb, +} + +impl CASOverlay { + pub fn new(db: Box) -> Self { + Self { + db, + mem: MemDb::default(), + } + } +} + +impl CASDatabase for CASOverlay { + fn clone_boxed(&self) -> Box { + Box::new(Self { + db: self.db.clone_boxed(), + mem: self.mem.clone(), + }) + } + + fn read(&self, hash: &H256) -> Option> { + self.mem.read(hash).or_else(|| self.db.read(hash)) + } + + fn write_by_hash(&self, hash: &H256, data: &[u8]) { + self.mem.write_by_hash(hash, data) + } +} + +pub struct KVOverlay { + db: Box, + mem: MemDb, +} + +impl KVOverlay { + pub fn new(db: Box) -> Self { + Self { + db, + mem: MemDb::default(), + } + } +} + +impl KVDatabase for KVOverlay { + fn clone_boxed_kv(&self) -> Box { + Box::new(Self { + db: self.db.clone_boxed_kv(), + mem: self.mem.clone(), + }) + } + + fn get(&self, key: &[u8]) -> Option> { + self.mem.get(key).or_else(|| self.db.get(key)) + } + + fn take(&self, _key: &[u8]) -> Option> { + unimplemented!() + } + + fn put(&self, key: &[u8], value: Vec) { + self.mem.put(key, value) + } + + fn iter_prefix<'a>( + &'a self, + prefix: &'a [u8], + ) -> Box, Vec)> + '_> { + let mem_iter = self.mem.iter_prefix(prefix); + let db_iter = self.db.iter_prefix(prefix); + + let full_iter = mem_iter.chain(db_iter); + + let mut known_keys = HashSet::new(); + + let filtered_iter = + full_iter.filter_map(move |(k, v)| known_keys.insert(hash(&k)).then_some((k, v))); + + Box::new(filtered_iter) + } +} diff --git a/ethexe/observer/src/query.rs b/ethexe/observer/src/query.rs index 7f9aa3e3580..26a0a11ef81 100644 --- a/ethexe/observer/src/query.rs +++ b/ethexe/observer/src/query.rs @@ -72,10 +72,9 @@ impl Query { .set_block_end_program_states(hash, Default::default()); // set latest valid if empty. - if self.database.latest_valid_block_height().is_none() { + if self.database.latest_valid_block().is_none() { let genesis_header = self.get_block_header_meta(hash).await?; - self.database - .set_latest_valid_block_height(genesis_header.height); + self.database.set_latest_valid_block(hash, genesis_header); } Ok(()) @@ -160,7 +159,8 @@ impl Query { let current_block = self.get_block_header_meta(block_hash).await?; let latest_valid_block_height = self .database - .latest_valid_block_height() + .latest_valid_block() + .map(|(_, header)| header.height) .expect("genesis by default; qed"); if current_block.height >= latest_valid_block_height @@ -196,7 +196,7 @@ impl Query { // Continue loading chain by parent hashes from the current block to the latest valid block. let mut hash = block_hash; - let mut height = current_block.height; + while hash != self.genesis_block_hash { // If the block's end state is valid, set it as the latest valid block if self @@ -204,7 +204,13 @@ impl Query { .block_end_state_is_valid(hash) .unwrap_or(false) { - self.database.set_latest_valid_block_height(height); + let header = match headers_map.get(&hash) { + Some(header) => header.clone(), + None => self.get_block_header_meta(hash).await?, + }; + + self.database.set_latest_valid_block(hash, header); + log::trace!("Nearest valid in db block found: {hash}"); break; } @@ -218,7 +224,6 @@ impl Query { Some(header) => header.parent_hash, None => self.get_block_parent_hash(hash).await?, }; - height -= 1; } let mut actual_commitment_queue: VecDeque = self diff --git a/ethexe/processor/src/host/mod.rs b/ethexe/processor/src/host/mod.rs index 1d3d8fb1315..846ada121cc 100644 --- a/ethexe/processor/src/host/mod.rs +++ b/ethexe/processor/src/host/mod.rs @@ -43,7 +43,6 @@ pub type Store = wasmtime::Store; #[derive(Clone)] pub(crate) struct InstanceCreator { - db: Database, engine: wasmtime::Engine, instance_pre: Arc>, @@ -54,7 +53,7 @@ pub(crate) struct InstanceCreator { } impl InstanceCreator { - pub fn new(db: Database, runtime: Vec) -> Result { + pub fn new(runtime: Vec) -> Result { gear_runtime_interface::sandbox_init(); let engine = wasmtime::Engine::default(); @@ -72,7 +71,6 @@ impl InstanceCreator { let instance_pre = Arc::new(instance_pre); Ok(Self { - db, engine, instance_pre, chain_head: None, @@ -87,7 +85,6 @@ impl InstanceCreator { let mut instance_wrapper = InstanceWrapper { instance, store, - db: self.db().clone(), chain_head: self.chain_head, }; @@ -100,10 +97,6 @@ impl InstanceCreator { Ok(instance_wrapper) } - pub fn db(&self) -> &Database { - &self.db - } - pub fn set_chain_head(&mut self, chain_head: H256) { self.chain_head = Some(chain_head); } @@ -112,15 +105,10 @@ impl InstanceCreator { pub(crate) struct InstanceWrapper { instance: wasmtime::Instance, store: Store, - db: Database, chain_head: Option, } impl InstanceWrapper { - pub fn db(&self) -> &Database { - &self.db - } - #[allow(unused)] pub fn data(&self) -> &StoreData { self.store.data() @@ -139,13 +127,14 @@ impl InstanceWrapper { pub fn run( &mut self, + db: Database, program_id: ProgramId, original_code_id: CodeId, state_hash: H256, maybe_instrumented_code: Option, ) -> Result> { let chain_head = self.chain_head.expect("chain head must be set before run"); - threads::set(self.db.clone(), chain_head, state_hash); + threads::set(db, chain_head, state_hash); let arg = ( program_id, @@ -157,9 +146,14 @@ impl InstanceWrapper { self.call("run", arg.encode()) } - pub fn wake_messages(&mut self, program_id: ProgramId, state_hash: H256) -> Result { + pub fn wake_messages( + &mut self, + db: Database, + program_id: ProgramId, + state_hash: H256, + ) -> Result { let chain_head = self.chain_head.expect("chain head must be set before wake"); - threads::set(self.db.clone(), chain_head, state_hash); + threads::set(db, chain_head, state_hash); self.call("wake_messages", (program_id, state_hash).encode()) } diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 0d91f9413c1..73672b3cec8 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -29,9 +29,9 @@ use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::state::{Dispatch, HashAndLen, MaybeHash, Storage}; use gear_core::{ ids::{prelude::CodeIdExt, ProgramId}, - message::{DispatchKind, Payload}, + message::{DispatchKind, Payload, ReplyInfo}, }; -use gprimitives::{CodeId, H256}; +use gprimitives::{ActorId, CodeId, MessageId, H256}; use host::InstanceCreator; use parity_scale_codec::{Decode, Encode}; use std::collections::{BTreeMap, VecDeque}; @@ -42,11 +42,81 @@ mod run; #[cfg(test)] mod tests; +#[derive(Clone)] pub struct Processor { db: Database, creator: InstanceCreator, } +#[derive(Clone)] +pub struct OverlaidProcessor(Processor); + +impl OverlaidProcessor { + // TODO (breathx): optimize for one single program. + pub fn execute_for_reply( + &mut self, + block_hash: H256, + source: ActorId, + program_id: ActorId, + payload: Vec, + value: u128, + ) -> Result { + self.0.creator.set_chain_head(block_hash); + + let mut states = self + .0 + .db + .block_start_program_states(block_hash) + .unwrap_or_default(); + + let Some(&state_hash) = states.get(&program_id) else { + return Err(anyhow::anyhow!("unknown program at specified block hash")); + }; + + let state = + self.0.db.read_state(state_hash).ok_or_else(|| { + anyhow::anyhow!("unreachable: state partially presents in storage") + })?; + + anyhow::ensure!( + !state.requires_init_message(), + "program isn't yet initialized" + ); + + self.0.handle_mirror_event( + &mut states, + program_id, + MirrorEvent::MessageQueueingRequested { + id: MessageId::zero(), + source, + payload, + value, + }, + )?; + + let (messages, _) = run::run(8, self.0.db.clone(), self.0.creator.clone(), &mut states); + + let res = messages + .into_iter() + .find_map(|message| { + message.reply_details().and_then(|details| { + (details.to_message_id() == MessageId::zero()).then(|| { + let parts = message.into_parts(); + + ReplyInfo { + payload: parts.3.into_vec(), + value: parts.5, + code: details.to_reply_code(), + } + }) + }) + }) + .ok_or_else(|| anyhow::anyhow!("reply wasn't found"))?; + + Ok(res) + } +} + /// Local changes that can be committed to the network or local signer. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum LocalOutcome { @@ -63,10 +133,16 @@ pub enum LocalOutcome { /// Maybe impl `struct EventProcessor`. impl Processor { pub fn new(db: Database) -> Result { - let creator = InstanceCreator::new(db.clone(), host::runtime())?; + let creator = InstanceCreator::new(host::runtime())?; Ok(Self { db, creator }) } + pub fn overlaid(mut self) -> OverlaidProcessor { + self.db = unsafe { self.db.overlaid() }; + + OverlaidProcessor(self) + } + /// Returns some CodeId in case of settlement and new code accepting. pub fn handle_new_code(&mut self, original_code: impl AsRef<[u8]>) -> Result> { let mut executor = self.creator.instantiate()?; @@ -206,7 +282,7 @@ impl Processor { log::debug!("{programs:?}"); - let messages_and_outcomes = run::run(8, self.creator.clone(), programs); + let messages_and_outcomes = run::run(8, self.db.clone(), self.creator.clone(), programs); Ok(messages_and_outcomes.1) } diff --git a/ethexe/processor/src/run.rs b/ethexe/processor/src/run.rs index 325d6742ab5..c54cc32809b 100644 --- a/ethexe/processor/src/run.rs +++ b/ethexe/processor/src/run.rs @@ -22,7 +22,7 @@ use crate::{ }; use core_processor::common::JournalNote; use ethexe_common::router::{OutgoingMessage, StateTransition}; -use ethexe_db::CodesStorage; +use ethexe_db::{CodesStorage, Database}; use ethexe_runtime_common::Handler; use gear_core::{ ids::{ActorId, ProgramId}, @@ -47,6 +47,7 @@ enum Task { pub fn run( threads_amount: usize, + db: Database, instance_creator: InstanceCreator, programs: &mut BTreeMap, ) -> (Vec, Vec) { @@ -57,13 +58,14 @@ pub fn run( .build() .unwrap(); - rt.block_on(async { run_in_async(instance_creator, programs).await }) + rt.block_on(async { run_in_async(db, instance_creator, programs).await }) }) } // TODO: Returning Vec is a temporary solution. // In future need to send all messages to users and all state hashes changes to sequencer. async fn run_in_async( + db: Database, instance_creator: InstanceCreator, programs: &mut BTreeMap, ) -> (Vec, Vec) { @@ -79,7 +81,12 @@ async fn run_in_async( for id in 0..num_workers { let (task_sender, task_receiver) = mpsc::channel(100); task_senders.push(task_sender); - let handle = tokio::spawn(worker(id, instance_creator.clone(), task_receiver)); + let handle = tokio::spawn(worker( + id, + db.clone(), + instance_creator.clone(), + task_receiver, + )); handles.push(handle); } @@ -105,7 +112,7 @@ async fn run_in_async( let mut handler = Handler { program_id, program_states: programs, - storage: instance_creator.db(), + storage: &db, block_info: Default::default(), results: Default::default(), to_users_messages: Default::default(), @@ -175,24 +182,19 @@ async fn run_in_async( (to_users_messages, outcomes) } -async fn run_task(executor: &mut InstanceWrapper, task: Task) { +async fn run_task(db: Database, executor: &mut InstanceWrapper, task: Task) { match task { Task::Run { program_id, state_hash, result_sender, } => { - let code_id = executor - .db() - .program_code_id(program_id) - .expect("Code ID must be set"); + let code_id = db.program_code_id(program_id).expect("Code ID must be set"); - let instrumented_code = executor - .db() - .instrumented_code(ethexe_runtime::VERSION, code_id); + let instrumented_code = db.instrumented_code(ethexe_runtime::VERSION, code_id); let journal = executor - .run(program_id, code_id, state_hash, instrumented_code) + .run(db, program_id, code_id, state_hash, instrumented_code) .expect("Some error occurs while running program in instance"); result_sender.send(journal).unwrap(); @@ -203,7 +205,7 @@ async fn run_task(executor: &mut InstanceWrapper, task: Task) { result_sender, } => { let new_state_hash = executor - .wake_messages(program_id, state_hash) + .wake_messages(db, program_id, state_hash) .expect("Some error occurs while waking messages"); result_sender.send(new_state_hash).unwrap(); } @@ -212,6 +214,7 @@ async fn run_task(executor: &mut InstanceWrapper, task: Task) { async fn worker( id: usize, + db: Database, instance_creator: InstanceCreator, mut task_receiver: mpsc::Receiver, ) { @@ -222,7 +225,7 @@ async fn worker( .expect("Failed to instantiate executor"); while let Some(task) = task_receiver.recv().await { - run_task(&mut executor, task).await; + run_task(db.clone(), &mut executor, task).await; } } diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index c7fa00d82a3..49258d70168 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -250,7 +250,12 @@ fn ping_pong() { let mut programs = BTreeMap::from_iter([(program_id, state_hash)]); - let (to_users, _) = run::run(8, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + 8, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), 2); @@ -376,7 +381,12 @@ fn async_and_ping() { let mut programs = BTreeMap::from_iter([(ping_id, ping_state_hash), (async_id, async_state_hash)]); - let (to_users, _) = run::run(8, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + 8, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), 3); @@ -457,7 +467,12 @@ fn many_waits() { programs.insert(program_id, state_hash); } - let (to_users, _) = run::run(threads_amount, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + threads_amount, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), amount as usize); for (_pid, state_hash) in programs.iter_mut() { @@ -468,7 +483,12 @@ fn many_waits() { *state_hash = new_state_hash; } - let (to_users, _) = run::run(threads_amount, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + threads_amount, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), 0); init_new_block( @@ -480,7 +500,12 @@ fn many_waits() { }, ); - let (to_users, _) = run::run(threads_amount, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + threads_amount, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), amount as usize); diff --git a/ethexe/rpc/Cargo.toml b/ethexe/rpc/Cargo.toml index 40c566ceb11..94a8625296c 100644 --- a/ethexe/rpc/Cargo.toml +++ b/ethexe/rpc/Cargo.toml @@ -15,6 +15,7 @@ anyhow.workspace = true futures.workspace = true gprimitives.workspace = true ethexe-db.workspace = true +ethexe-processor.workspace = true jsonrpsee = { version = "0.24", features = ["server", "macros"] } tower = { version = "0.4.13", features = ["full"] } hyper = { version = "1.4.1", features = ["server"] } @@ -22,3 +23,5 @@ log.workspace = true parity-scale-codec.workspace = true hex.workspace = true ethexe-common.workspace = true +sp-core = { workspace = true, features = ["serde"] } +gear-core = { workspace = true, features = ["std"] } diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 6f9987287f4..ea91cdd8955 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -1,6 +1,8 @@ use ethexe_db::{BlockHeader, BlockMetaStorage, Database}; +use ethexe_processor::Processor; use futures::FutureExt; -use gprimitives::H256; +use gear_core::message::ReplyInfo; +use gprimitives::{H160, H256}; use jsonrpsee::{ core::{async_trait, RpcResult}, proc_macros::rpc, @@ -8,9 +10,10 @@ use jsonrpsee::{ serve_with_graceful_shutdown, stop_channel, Server, ServerHandle, StopHandle, TowerServiceBuilder, }, - types::{ErrorCode, ErrorObject}, + types::ErrorObject, Methods, }; +use sp_core::Bytes; use std::net::SocketAddr; use tokio::net::TcpListener; use tower::Service; @@ -25,7 +28,17 @@ struct PerConnection { #[rpc(server)] pub trait RpcApi { #[method(name = "blockHeader")] - async fn block_header(&self, hash: H256) -> RpcResult; + async fn block_header(&self, hash: Option) -> RpcResult<(H256, BlockHeader)>; + + #[method(name = "calculateReplyForHandle")] + async fn calculate_reply_for_handle( + &self, + at: Option, + source: H160, + program_id: H160, + payload: Bytes, + value: u128, + ) -> RpcResult; } pub struct RpcModule { @@ -36,16 +49,68 @@ impl RpcModule { pub fn new(db: Database) -> Self { Self { db } } + + pub fn block_header_at_or_latest( + &self, + at: impl Into>, + ) -> RpcResult<(H256, BlockHeader)> { + if let Some(hash) = at.into() { + self.db + .block_header(hash) + .map(|header| (hash, header)) + .ok_or_else(|| db_err("Block header for requested hash wasn't found")) + } else { + self.db + .latest_valid_block() + .ok_or_else(|| db_err("Latest block header wasn't found")) + } + } } #[async_trait] impl RpcApiServer for RpcModule { - async fn block_header(&self, hash: H256) -> RpcResult { - // let db = db.lock().await; - self.db.block_header(hash).ok_or_else(|| { - ErrorObject::borrowed(ErrorCode::InvalidParams.code(), "Block not found", None) - }) + async fn block_header(&self, hash: Option) -> RpcResult<(H256, BlockHeader)> { + self.block_header_at_or_latest(hash) } + + async fn calculate_reply_for_handle( + &self, + at: Option, + source: H160, + program_id: H160, + payload: Bytes, + value: u128, + ) -> RpcResult { + let block_hash = self.block_header_at_or_latest(at)?.0; + + // TODO (breathx): spawn in a new thread and catch panics. (?) Generally catch runtime panics (?). + // TODO (breathx): optimize here instantiation if matches actual runtime. + let processor = Processor::new(self.db.clone()).map_err(|_| internal())?; + + let mut overlaid_processor = processor.overlaid(); + + overlaid_processor + .execute_for_reply( + block_hash, + source.into(), + program_id.into(), + payload.0, + value, + ) + .map_err(runtime_err) + } +} + +fn db_err(err: &'static str) -> ErrorObject<'static> { + ErrorObject::owned(8000, "Database error", Some(err)) +} + +fn runtime_err(err: anyhow::Error) -> ErrorObject<'static> { + ErrorObject::owned(8000, "Runtime error", Some(format!("{err}"))) +} + +fn internal() -> ErrorObject<'static> { + ErrorObject::owned(8000, "Internal error", None::<&str>) } pub struct RpcConfig {