Skip to content

Commit

Permalink
feat(ethexe): delayed messaging (#4290)
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx authored Oct 17, 2024
1 parent 6f943d7 commit 80e3457
Show file tree
Hide file tree
Showing 19 changed files with 369 additions and 115 deletions.
8 changes: 4 additions & 4 deletions common/src/auxiliary/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@

use super::{AuxiliaryDoubleStorageWrap, BlockNumber, DoubleBTreeMap};
use crate::scheduler::TaskPoolImpl;
use gear_core::{ids::ProgramId, tasks::ScheduledTask};
use gear_core::{ids::ProgramId, tasks::VaraScheduledTask};
use std::cell::RefCell;

/// Task pool implementation that can be used in a native, non-wasm runtimes.
pub type AuxiliaryTaskpool<TaskPoolCallbacks> = TaskPoolImpl<
TaskPoolStorageWrap,
ScheduledTask<ProgramId>,
VaraScheduledTask<ProgramId>,
TaskPoolErrorImpl,
TaskPoolErrorImpl,
TaskPoolCallbacks,
>;

std::thread_local! {
pub(crate) static TASKPOOL_STORAGE: RefCell<DoubleBTreeMap<BlockNumber, ScheduledTask<ProgramId>, ()>> = const { RefCell::new(DoubleBTreeMap::new()) };
pub(crate) static TASKPOOL_STORAGE: RefCell<DoubleBTreeMap<BlockNumber, VaraScheduledTask<ProgramId>, ()>> = const { RefCell::new(DoubleBTreeMap::new()) };
}

/// `TaskPool` double storage map manager
pub struct TaskPoolStorageWrap;

impl AuxiliaryDoubleStorageWrap for TaskPoolStorageWrap {
type Key1 = BlockNumber;
type Key2 = ScheduledTask<ProgramId>;
type Key2 = VaraScheduledTask<ProgramId>;
type Value = ();

fn with_storage<F, R>(f: F) -> R
Expand Down
29 changes: 15 additions & 14 deletions core/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ use gsys::Gas;
use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
use scale_info::TypeInfo;

/// Alias for ScheduledTask used in vara-runtime, generic across AccountId used.
pub type VaraScheduledTask<AccountId> = ScheduledTask<AccountId, MessageId, bool>;

/// Scheduled task sense and required data for processing action.
///
/// CAUTION: NEVER ALLOW `ScheduledTask<AccountId>` BE A BIG DATA.
/// CAUTION: NEVER ALLOW `ScheduledTask` BE A BIG DATA.
/// To avoid redundant migrations only append new variant(s) to the enum
/// with an explicit corresponding scale codec index.
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Encode, Decode, TypeInfo, MaxEncodedLen)]
pub enum ScheduledTask<AccountId> {
pub enum ScheduledTask<RFM, SD, SUM> {
// Rent charging section.
// -----
/// Pause program as out of rent one.
Expand All @@ -42,7 +45,7 @@ pub enum ScheduledTask<AccountId> {

/// Remove message from mailbox as out of rent one.
#[codec(index = 2)]
RemoveFromMailbox(AccountId, MessageId),
RemoveFromMailbox(RFM, MessageId),

/// Remove message from waitlist as out of rent one.
#[codec(index = 3)]
Expand All @@ -62,7 +65,7 @@ pub enum ScheduledTask<AccountId> {
///
/// The message itself stored in DispatchStash.
#[codec(index = 6)]
SendDispatch(MessageId),
SendDispatch(SD),

/// Delayed message to user sending.
///
Expand All @@ -72,7 +75,7 @@ pub enum ScheduledTask<AccountId> {
/// What message to send.
message_id: MessageId,
/// Should it be inserted into users mailbox.
to_mailbox: bool,
to_mailbox: SUM,
},

/// Remove gas reservation.
Expand All @@ -85,9 +88,9 @@ pub enum ScheduledTask<AccountId> {
RemoveResumeSession(u32),
}

impl<AccountId> ScheduledTask<AccountId> {
impl<RFM, SD, SUM> ScheduledTask<RFM, SD, SUM> {
/// Processing function of current task with given handler.
pub fn process_with(self, handler: &mut impl TaskHandler<AccountId>) -> Gas {
pub fn process_with(self, handler: &mut impl TaskHandler<RFM, SD, SUM>) -> Gas {
use ScheduledTask::*;

match self {
Expand Down Expand Up @@ -116,15 +119,15 @@ impl<AccountId> ScheduledTask<AccountId> {
}

/// Task handler trait for dealing with required tasks.
pub trait TaskHandler<AccountId> {
pub trait TaskHandler<RFM, SD, SUM> {
// Rent charging section.
// -----
/// Pause program action.
fn pause_program(&mut self, program_id: ProgramId) -> Gas;
/// Remove code action.
fn remove_code(&mut self, code_id: CodeId) -> Gas;
/// Remove from mailbox action.
fn remove_from_mailbox(&mut self, user_id: AccountId, message_id: MessageId) -> Gas;
fn remove_from_mailbox(&mut self, user_id: RFM, message_id: MessageId) -> Gas;
/// Remove from waitlist action.
fn remove_from_waitlist(&mut self, program_id: ProgramId, message_id: MessageId) -> Gas;
/// Remove paused program action.
Expand All @@ -136,10 +139,10 @@ pub trait TaskHandler<AccountId> {
fn wake_message(&mut self, program_id: ProgramId, message_id: MessageId) -> Gas;

/// Send delayed message to program action.
fn send_dispatch(&mut self, stashed_message_id: MessageId) -> Gas;
fn send_dispatch(&mut self, stashed_message_id: SD) -> Gas;

/// Send delayed message to user action.
fn send_user_message(&mut self, stashed_message_id: MessageId, to_mailbox: bool) -> Gas;
fn send_user_message(&mut self, stashed_message_id: MessageId, to_mailbox: SUM) -> Gas;

/// Remove gas reservation action.
fn remove_gas_reservation(
Expand All @@ -158,7 +161,5 @@ fn task_encoded_size() {
const MAX_SIZE: usize = 256;

// For example we will take `AccountId` = `ProgramId` from `gear_core`.
type AccountId = ProgramId;

assert!(ScheduledTask::<AccountId>::max_encoded_len() <= MAX_SIZE);
assert!(VaraScheduledTask::<ProgramId>::max_encoded_len() <= MAX_SIZE);
}
15 changes: 12 additions & 3 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@ use gear_core::{
code::InstrumentedCode,
ids::{ActorId, CodeId, ProgramId},
};
use gprimitives::H256;
use gprimitives::{MessageId, H256};
use parity_scale_codec::{Decode, Encode};

/// NOTE: key for actor id is (program_id, user_id). only used for mailbox.
pub type ScheduledTask = gear_core::tasks::ScheduledTask<(ProgramId, ActorId)>;
/// RemoveFromMailbox key; (msgs sources program (mailbox and queue provider), destination user id)
pub type Rfm = (ProgramId, ActorId);

/// SendDispatch key; (msgs destinations program (stash and queue provider), message id)
pub type Sd = (ProgramId, MessageId);

/// SendUserMessage key; (msgs sources program (mailbox and stash provider))
pub type Sum = ProgramId;

/// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe.
pub type ScheduledTask = gear_core::tasks::ScheduledTask<Rfm, Sd, Sum>;

#[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize)]
pub struct BlockHeader {
Expand Down
13 changes: 12 additions & 1 deletion ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use ethexe_common::{
BlockRequestEvent,
};
use ethexe_runtime_common::state::{
Allocations, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist,
Allocations, DispatchStash, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist,
};
use gear_core::{
code::InstrumentedCode,
Expand Down Expand Up @@ -504,6 +504,17 @@ impl Storage for Database {
self.cas.write(&waitlist.encode())
}

fn read_stash(&self, hash: H256) -> Option<DispatchStash> {
self.cas.read(&hash).map(|data| {
DispatchStash::decode(&mut data.as_slice())
.expect("Failed to decode data into `DispatchStash`")
})
}

fn write_stash(&self, stash: DispatchStash) -> H256 {
self.cas.write(&stash.encode())
}

fn read_mailbox(&self, hash: H256) -> Option<Mailbox> {
self.cas.read(&hash).map(|data| {
Mailbox::decode(&mut data.as_slice()).expect("Failed to decode data into `Mailbox`")
Expand Down
9 changes: 6 additions & 3 deletions ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,16 @@ impl Processor {
let states = self
.db
.block_start_program_states(block_hash)
.unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?
.ok_or_else(|| {
anyhow!("failed to get block start program states for under-processing block")
})?;

let schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?
let schedule = self.db.block_start_schedule(block_hash).ok_or_else(|| {
anyhow!("failed to get block start schedule for under-processing block")
})?;

let mut in_block_transitions = InBlockTransitions::new(header, states, schedule);

// TODO (breathx): handle resulting addresses that were changed (e.g. top up balance wont be dumped as outcome).
for event in events {
match event {
BlockRequestEvent::Router(event) => {
Expand Down
116 changes: 71 additions & 45 deletions ethexe/runtime/common/src/journal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
state::{
self, ActiveProgram, ComplexStorage, Dispatch, HashAndLen, MaybeHash, Program,
ProgramState, Storage,
ProgramState, Storage, MAILBOX_VALIDITY,
},
InBlockTransitions,
};
Expand Down Expand Up @@ -163,49 +163,71 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
unreachable!("deprecated: {dispatch:?}");
}

if delay != 0 {
todo!("delayed sending isn't supported yet");
}

if self
.in_block_transitions
.state_of(&dispatch.destination())
.is_none()
{
if !dispatch.is_reply() {
let expiry = self.in_block_transitions.schedule_task(
state::MAILBOX_VALIDITY.try_into().expect("infallible"),
ScheduledTask::RemoveFromMailbox(
(dispatch.source(), dispatch.destination()),
dispatch.id(),
),
);

self.update_state_with_storage(dispatch.source(), |storage, state| {
state.mailbox_hash =
storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| {
mailbox
.entry(dispatch.destination())
.or_default()
.insert(dispatch.id(), (dispatch.value(), expiry));
})?;
let user_id = dispatch.destination();

Ok(())
});
if !dispatch.is_reply() {
if let Ok(non_zero_delay) = delay.try_into() {
let expiry = self.in_block_transitions.schedule_task(
non_zero_delay,
ScheduledTask::SendUserMessage {
message_id: dispatch.id(),
to_mailbox: dispatch.source(),
},
);

self.update_state_with_storage(dispatch.source(), |storage, state| {
let dispatch = Dispatch::from_stored(storage, dispatch.into_stored());

state.stash_hash =
storage.modify_stash(state.stash_hash.clone(), |stash| {
let r =
stash.insert(dispatch.id, ((dispatch, Some(user_id)), expiry));
debug_assert!(r.is_none());
})?;

Ok(())
});

return;
} else {
let expiry = self.in_block_transitions.schedule_task(
MAILBOX_VALIDITY.try_into().expect("infallible"),
ScheduledTask::RemoveFromMailbox(
(dispatch.source(), user_id),
dispatch.id(),
),
);

self.update_state_with_storage(dispatch.source(), |storage, state| {
state.mailbox_hash =
storage.modify_mailbox(state.mailbox_hash.clone(), |mailbox| {
mailbox
.entry(user_id)
.or_default()
.insert(dispatch.id(), (dispatch.value(), expiry));
})?;

Ok(())
});
}
}

// TODO (breathx): send here to in_block_transitions.
let source = dispatch.source();
let message = dispatch.into_parts().1;

let source_state_hash = self
let state_hash = self
.in_block_transitions
.state_of(&source)
.expect("must exist");

self.in_block_transitions.modify_state_with(
source,
source_state_hash,
state_hash,
0,
vec![],
vec![OutgoingMessage::from(message)],
Expand All @@ -214,27 +236,31 @@ impl<S: Storage> JournalHandler for Handler<'_, S> {
return;
}

let (kind, message) = dispatch.into_parts();
let (id, source, destination, payload, gas_limit, value, details) = message.into_parts();
let destination = dispatch.destination();
let dispatch = Dispatch::from_stored(self.storage, dispatch.into_stored());

let payload_hash = self.storage.write_payload(payload).into();
if let Ok(non_zero_delay) = delay.try_into() {
let expiry = self.in_block_transitions.schedule_task(
non_zero_delay,
ScheduledTask::SendDispatch((destination, dispatch.id)),
);

let dispatch = Dispatch {
id,
kind,
source,
payload_hash,
value,
details,
context: None,
};
self.update_state_with_storage(destination, |storage, state| {
state.stash_hash = storage.modify_stash(state.stash_hash.clone(), |stash| {
let r = stash.insert(dispatch.id, ((dispatch, None), expiry));
debug_assert!(r.is_none());
})?;

self.update_state_with_storage(destination, |storage, state| {
state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| {
queue.push_back(dispatch);
})?;
Ok(())
});
Ok(())
});
} else {
self.update_state_with_storage(destination, |storage, state| {
state.queue_hash = storage.modify_queue(state.queue_hash.clone(), |queue| {
queue.push_back(dispatch);
})?;
Ok(())
});
}
}

fn wait_dispatch(
Expand Down
Loading

0 comments on commit 80e3457

Please sign in to comment.