Skip to content

Commit

Permalink
feat(ethexe): use real block header in handling; impl Mailbox timeout…
Browse files Browse the repository at this point in the history
…s; remove scheduled tasks on actions (#4283)
  • Loading branch information
breathx authored Oct 9, 2024
1 parent 93eb5b5 commit c850327
Show file tree
Hide file tree
Showing 45 changed files with 451 additions and 281 deletions.
2 changes: 1 addition & 1 deletion core-processor/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ impl<LP: LazyPagesInterface> Externalities for Ext<LP> {
}

fn system_reserve_gas(&mut self, amount: u64) -> Result<(), Self::FallibleError> {
// TODO: use `NonZeroU64` after issue #1838 is fixed
// TODO: use `NonZero<u64>` after issue #1838 is fixed
if amount == 0 {
return Err(ReservationError::ZeroReservationAmount.into());
}
Expand Down
25 changes: 8 additions & 17 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ethexe_common::{
},
BlockRequestEvent,
};
use ethexe_db::{BlockHeader, BlockMetaStorage, CodesStorage, Database};
use ethexe_db::{BlockMetaStorage, CodesStorage, Database};
use ethexe_ethereum::router::RouterQuery;
use ethexe_network::NetworkReceiverEvent;
use ethexe_observer::{RequestBlockData, RequestEvent};
Expand Down Expand Up @@ -333,20 +333,11 @@ impl Service {
processor: &mut ethexe_processor::Processor,
block_data: RequestBlockData,
) -> Result<Vec<BlockCommitment>> {
db.set_block_events(block_data.block_hash, block_data.events);
db.set_block_header(
block_data.block_hash,
BlockHeader {
height: block_data.block_number.try_into()?,
timestamp: block_data.block_timestamp,
parent_hash: block_data.parent_hash,
},
);
db.set_block_events(block_data.hash, block_data.events);
db.set_block_header(block_data.hash, block_data.header);

let mut commitments = vec![];
let last_committed_chain = query
.get_last_committed_chain(block_data.block_hash)
.await?;
let last_committed_chain = query.get_last_committed_chain(block_data.hash).await?;
for block_hash in last_committed_chain.into_iter().rev() {
let transitions = Self::process_one_block(db, query, processor, block_hash).await?;

Expand All @@ -357,7 +348,7 @@ impl Service {

commitments.push(BlockCommitment {
block_hash,
pred_block_hash: block_data.block_hash,
pred_block_hash: block_data.hash,
prev_commitment_hash: db
.block_prev_commitment(block_hash)
.ok_or_else(|| anyhow!("Prev commitment not found"))?,
Expand All @@ -383,9 +374,9 @@ impl Service {
RequestEvent::Block(block_data) => {
log::info!(
"📦 receive a new block {}, hash {}, parent hash {}",
block_data.block_number,
block_data.block_hash,
block_data.parent_hash
block_data.header.height,
block_data.hash,
block_data.header.parent_hash
);

let commitments =
Expand Down
75 changes: 62 additions & 13 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use anyhow::Result;
use ethexe_common::{
db::CodesStorage, mirror::Event as MirrorEvent, router::Event as RouterEvent, BlockEvent,
};
use ethexe_db::{Database, MemDb};
use ethexe_db::{BlockMetaStorage, Database, MemDb, ScheduledTask};
use ethexe_ethereum::{router::RouterQuery, Ethereum};
use ethexe_observer::{Event, MockBlobReader, Observer, Query};
use ethexe_processor::Processor;
Expand All @@ -42,7 +42,11 @@ use gear_core::{
};
use gprimitives::{ActorId, CodeId, MessageId, H160, H256};
use parity_scale_codec::Encode;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
time::Duration,
};
use tokio::{
sync::oneshot,
task::{self, JoinHandle},
Expand Down Expand Up @@ -183,12 +187,13 @@ async fn mailbox() {
.await
.unwrap();

let mid_expected_message = MessageId::generate_outgoing(res.message_id, 0);
let ping_expected_message = MessageId::generate_outgoing(res.message_id, 1);
let original_mid = res.message_id;
let mid_expected_message = MessageId::generate_outgoing(original_mid, 0);
let ping_expected_message = MessageId::generate_outgoing(original_mid, 1);

let mut listener = env.events_publisher().subscribe().await;
listener
.apply_until_block_event(|event| match event {
let block_data = listener
.apply_until_block_event_with_header(|event, block_data| match event {
BlockEvent::Mirror { address, event } if address == pid => {
if let MirrorEvent::Message {
id,
Expand All @@ -204,7 +209,7 @@ async fn mailbox() {
Ok(None)
} else if id == ping_expected_message {
assert_eq!(payload, b"PING");
Ok(Some(()))
Ok(Some(block_data.clone()))
} else {
unreachable!()
}
Expand All @@ -217,9 +222,37 @@ async fn mailbox() {
.await
.unwrap();

// -1 bcs execution took place in previous block, not the one that emits events.
let wake_expiry = block_data.header.height - 1 + 100; // 100 is default wait for.
let expiry = block_data.header.height - 1 + ethexe_runtime_common::state::MAILBOX_VALIDITY;

let expected_schedule = BTreeMap::from_iter([
(
wake_expiry,
BTreeSet::from_iter([ScheduledTask::WakeMessage(pid, original_mid)]),
),
(
expiry,
BTreeSet::from_iter([
ScheduledTask::RemoveFromMailbox((pid, env.sender_id), mid_expected_message),
ScheduledTask::RemoveFromMailbox((pid, env.sender_id), ping_expected_message),
]),
),
]);

let schedule = node
.db
.block_end_schedule(block_data.header.parent_hash)
.expect("must exist");

assert_eq!(schedule, expected_schedule);

let expected_mailbox = BTreeMap::from_iter([(
env.sender_id,
BTreeMap::from_iter([(mid_expected_message, 0), (ping_expected_message, 0)]),
BTreeMap::from_iter([
(mid_expected_message, (0, expiry)),
(ping_expected_message, (0, expiry)),
]),
)]);
let mirror = env.ethereum.mirror(pid.try_into().unwrap());
let state_hash = mirror.query().state_hash().await.unwrap();
Expand Down Expand Up @@ -255,20 +288,20 @@ async fn mailbox() {

let expected_mailbox = BTreeMap::from_iter([(
env.sender_id,
BTreeMap::from_iter([(mid_expected_message, 0)]),
BTreeMap::from_iter([(mid_expected_message, (0, expiry))]),
)]);

assert_eq!(mailbox, expected_mailbox);

mirror.claim_value(mid_expected_message).await.unwrap();

listener
.apply_until_block_event(|event| match event {
let block_data = listener
.apply_until_block_event_with_header(|event, block_data| match event {
BlockEvent::Mirror { address, event } if address == pid => match event {
MirrorEvent::ValueClaimed { claimed_id, .. }
if claimed_id == mid_expected_message =>
{
Ok(Some(()))
Ok(Some(block_data.clone()))
}
_ => Ok(None),
},
Expand All @@ -281,6 +314,12 @@ async fn mailbox() {

let state = node.db.read_state(state_hash).unwrap();
assert!(state.mailbox_hash.is_empty());

let schedule = node
.db
.block_end_schedule(block_data.header.parent_hash)
.expect("must exist");
assert!(schedule.is_empty(), "{:?}", schedule);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -646,6 +685,7 @@ async fn multiple_validators() {

mod utils {
use super::*;
use ethexe_observer::SimpleBlockData;
use futures::StreamExt;
use gear_core::message::ReplyCode;
use tokio::sync::{broadcast::Sender, Mutex};
Expand Down Expand Up @@ -1020,6 +1060,13 @@ mod utils {
pub async fn apply_until_block_event<R: Sized>(
&mut self,
mut f: impl FnMut(BlockEvent) -> Result<Option<R>>,
) -> Result<R> {
self.apply_until_block_event_with_header(|e, _h| f(e)).await
}

pub async fn apply_until_block_event_with_header<R: Sized>(
&mut self,
mut f: impl FnMut(BlockEvent, &SimpleBlockData) -> Result<Option<R>>,
) -> Result<R> {
loop {
let event = self.next_event().await?;
Expand All @@ -1028,8 +1075,10 @@ mod utils {
continue;
};

let block_data = block.as_simple();

for event in block.events {
if let Some(res) = f(event)? {
if let Some(res) = f(event, &block_data)? {
return Ok(res);
}
}
Expand Down
18 changes: 16 additions & 2 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use gear_core::{
use gprimitives::H256;
use parity_scale_codec::{Decode, Encode};

pub type ScheduledTask = gear_core::tasks::ScheduledTask<ActorId>;
/// NOTE: key for actor id is (program_id, user_id). only used for mailbox.
pub type ScheduledTask = gear_core::tasks::ScheduledTask<(ProgramId, ActorId)>;

#[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize)]
pub struct BlockHeader {
Expand All @@ -39,13 +40,26 @@ pub struct BlockHeader {
pub parent_hash: H256,
}

impl BlockHeader {
pub fn dummy(height: u32) -> Self {
let mut parent_hash = [0; 32];
parent_hash[..4].copy_from_slice(&height.to_le_bytes());

Self {
height,
timestamp: height as u64 * 12,
parent_hash: parent_hash.into(),
}
}
}

#[derive(Debug, Clone, Default, Encode, Decode)]
pub struct CodeUploadInfo {
pub origin: ActorId,
pub tx_hash: H256,
}

pub type Schedule = BTreeMap<u32, Vec<ScheduledTask>>;
pub type Schedule = BTreeMap<u32, BTreeSet<ScheduledTask>>;

pub trait BlockMetaStorage: Send + Sync {
fn block_header(&self, block_hash: H256) -> Option<BlockHeader>;
Expand Down
2 changes: 1 addition & 1 deletion ethexe/ethereum/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Router {
}
}

Err(anyhow::anyhow!("Failed to define if code is validated"))
Err(anyhow!("Failed to define if code is validated"))
}

pub async fn create_program(
Expand Down
8 changes: 4 additions & 4 deletions ethexe/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod export {
pub use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
}

use anyhow::Context;
use anyhow::{anyhow, Context};
use ethexe_db::Database;
use ethexe_signer::{PublicKey, Signer};
use futures::future::Either;
Expand Down Expand Up @@ -544,18 +544,18 @@ impl Behaviour {
gossipsub::MessageId::from(hasher.finish().to_be_bytes())
})
.build()
.map_err(|e| anyhow::anyhow!("`gossipsub::ConfigBuilder::build()` error: {e}"))?;
.map_err(|e| anyhow!("`gossipsub::ConfigBuilder::build()` error: {e}"))?;
let mut gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
gossip_config,
)
.map_err(|e| anyhow::anyhow!("`gossipsub::Behaviour` error: {e}"))?;
.map_err(|e| anyhow!("`gossipsub::Behaviour` error: {e}"))?;
gossipsub
.with_peer_score(
gossipsub::PeerScoreParams::default(),
gossipsub::PeerScoreThresholds::default(),
)
.map_err(|e| anyhow::anyhow!("`gossipsub` scoring parameters error: {e}"))?;
.map_err(|e| anyhow!("`gossipsub` scoring parameters error: {e}"))?;

gossipsub.subscribe(&gpu_commitments_topic())?;

Expand Down
37 changes: 29 additions & 8 deletions ethexe/observer/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use ethexe_common::{BlockEvent, BlockRequestEvent};
use ethexe_db::BlockHeader;
use gprimitives::{CodeId, H256};
use parity_scale_codec::{Decode, Encode};

Expand All @@ -16,18 +17,38 @@ pub enum Event {

#[derive(Debug, Clone, Encode, Decode)]
pub struct RequestBlockData {
pub parent_hash: H256,
pub block_hash: H256,
pub block_number: u64,
pub block_timestamp: u64,
pub hash: H256,
pub header: BlockHeader,
pub events: Vec<BlockRequestEvent>,
}

impl RequestBlockData {
pub fn as_simple(&self) -> SimpleBlockData {
SimpleBlockData {
hash: self.hash,
header: self.header.clone(),
}
}
}

#[derive(Debug, Clone, Encode, Decode)]
pub struct BlockData {
pub parent_hash: H256,
pub block_hash: H256,
pub block_number: u64,
pub block_timestamp: u64,
pub hash: H256,
pub header: BlockHeader,
pub events: Vec<BlockEvent>,
}

impl BlockData {
pub fn as_simple(&self) -> SimpleBlockData {
SimpleBlockData {
hash: self.hash,
header: self.header.clone(),
}
}
}

#[derive(Debug, Clone, Encode, Decode)]
pub struct SimpleBlockData {
pub hash: H256,
pub header: BlockHeader,
}
2 changes: 1 addition & 1 deletion ethexe/observer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ mod observer;
mod query;

pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader};
pub use event::{BlockData, Event, RequestBlockData, RequestEvent};
pub use event::{BlockData, Event, RequestBlockData, RequestEvent, SimpleBlockData};
pub use observer::{Observer, ObserverStatus};
pub use query::Query;
Loading

0 comments on commit c850327

Please sign in to comment.