Skip to content

Commit

Permalink
feat: state primitives for the amplifier component (#26)
Browse files Browse the repository at this point in the history
* feat: trait for amplifier storage
* feat: memmap storage implementation
* feat: storage path gets derived from config
* feat: fix an issue where we can skip tasks if relayer is shut down mid-processing
  • Loading branch information
roberts-pumpurs authored Oct 31, 2024
1 parent ad23e6a commit 0463033
Show file tree
Hide file tree
Showing 20 changed files with 361 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ target/

*.key
config.toml
store
35 changes: 32 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ unused_must_use = { level = "deny", priority = -1 }
# Our crates
relayer-engine = { path = "crates/relayer-engine" }
relayer-amplifier-api-integration = { path = "crates/relayer-amplifier-api-integration" }
relayer-amplifier-state = { path = "crates/relayer-amplifier-state" }
file-based-storage = { path = "crates/file-based-storage" }
amplifier-api = { path = "crates/amplifier-api" }
solana-listener = { path = "crates/solana-listener" }
common-serde-utils = { path = "crates/common-serde-utils" }
Expand Down Expand Up @@ -102,6 +104,8 @@ backoff = { version = "0.4", features = ["tokio"] }
indoc = "2"
itertools = "0.12"
num-traits = "0.2"
memmap2 = "0.9"
bytemuck = "1.19"

# Serde
serde = { version = "1", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
storage_path = "./storage"
[amplifier_component]
# pem format cert
identity = '''
Expand Down
22 changes: 20 additions & 2 deletions crates/amplifier-api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
//! Contsructed form the following API spec [link](https://github.com/axelarnetwork/axelar-eds-mirror/blob/main/oapi/gmp/schema.yaml)
pub use big_int::BigInt;
pub use bnum;
use chrono::{DateTime, Utc};
pub use id::*;
use serde::{Deserialize, Deserializer, Serialize};
use typed_builder::TypedBuilder;
pub use {bnum, uuid};

/// Represents an address as a non-empty string.
pub type Address = String;
Expand Down Expand Up @@ -500,7 +500,7 @@ pub enum Task {
}

/// Represents an individual Task Item.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
#[derive(PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
pub struct TaskItem {
/// UUID of current task
pub id: TaskItemId,
Expand All @@ -511,6 +511,24 @@ pub struct TaskItem {
pub task: Task,
}

#[expect(clippy::min_ident_chars, reason = "comes from trait definition")]
impl core::fmt::Debug for TaskItem {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let task_type = match self.task {
Task::Verify(_) => "Verify",
Task::GatewayTx(_) => "GatewayTx",
Task::Execute(_) => "Execute",
Task::Refund(_) => "Refund",
};

f.debug_struct("TaskItem")
.field("id", &self.id)
.field("timestamp", &self.timestamp)
.field("task", &task_type)
.finish()
}
}

/// Represents the response from fetching tasks.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
pub struct GetTasksResult {
Expand Down
18 changes: 18 additions & 0 deletions crates/file-based-storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "file-based-storage"
version.workspace = true
authors.workspace = true
repository.workspace = true
homepage.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
relayer-amplifier-state.workspace = true
amplifier-api.workspace = true
memmap2.workspace = true
tracing.workspace = true
bytemuck.workspace = true

[lints]
workspace = true
140 changes: 140 additions & 0 deletions crates/file-based-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#![expect(clippy::allow_attributes_without_reason)]
#![expect(clippy::allow_attributes)]

//! simple memory storage implementation using memory maps
use std::fs::OpenOptions;
use std::io::{self, Seek as _, SeekFrom, Write as _};
use std::path::Path;
use std::sync::{Arc, Mutex};

use amplifier_api::types::{uuid, TaskItemId};
use bytemuck::{Pod, Zeroable};
use memmap2::MmapMut;

/// Memory map wrapper that implements the state to successfully store and retrieve latest task item
/// id
#[derive(Debug, Clone)]
pub struct MemmapState {
mmap: Arc<Mutex<MmapMut>>,
}

#[repr(C)]
#[derive(Default, Debug, Copy, Clone, Pod, Zeroable)]
struct InternalState {
latest_queried_task_item_id: u128,
latest_processed_task_item_id: u128,
}

#[expect(
clippy::expect_used,
clippy::unwrap_in_result,
reason = "irrecoverable error"
)]
impl MemmapState {
/// Creates a new [`MemmapState`] with the memory-mapped file at the given path.
///
/// # Errors
/// If the file cannot be created / opened
///
/// # Panics
/// If the expected state of the [`InternalState`] will be larger than `u64`
pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
// Open or create the file with read and write permissions
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Ensure the file is at least the size of InternalState
let default_state = InternalState::default();
let default_state_bytes = bytemuck::bytes_of(&default_state);
let expected_len = default_state_bytes
.len()
.try_into()
.expect("the size of default state must fit in a u64");
if file.metadata()?.len() < expected_len {
file.set_len(expected_len)?;
file.seek(SeekFrom::Start(0))?;
file.write_all(default_state_bytes)?;
}

// Create a mutable memory map of the file
// SAFETY:
// we ensured that the size is large enough
let mmap = unsafe { MmapMut::map_mut(&file)? };
mmap.flush_async()?;

Ok(Self {
mmap: Arc::new(Mutex::new(mmap)),
})
}

// Generic helper function for getting a TaskItemId
fn get_task_item_id<F>(&self, field_accessor: F) -> Option<TaskItemId>
where
F: Fn(&InternalState) -> u128,
{
let mmap = self.mmap.lock().expect("lock should not be poisoned");
let data = bytemuck::from_bytes::<InternalState>(&mmap[..]);
let task_item_id = field_accessor(data);
drop(mmap);

if task_item_id == 0 {
None
} else {
Some(TaskItemId(uuid::Uuid::from_u128(task_item_id)))
}
}

// Generic helper function for setting a TaskItemId
fn set_task_item_id<F>(
&self,
task_item_id: &TaskItemId,
field_mutator: F,
) -> Result<(), io::Error>
where
F: Fn(&mut InternalState, u128),
{
let mut mmap = self.mmap.lock().expect("lock should not be poisoned");
let raw_u128 = task_item_id.0.as_u128();
let data = bytemuck::from_bytes_mut::<InternalState>(&mut mmap[..]);
field_mutator(data, raw_u128);
mmap.flush_async()?;
drop(mmap);
Ok(())
}
}

impl relayer_amplifier_state::State for MemmapState {
type Err = io::Error;

#[tracing::instrument(skip(self), level = "trace", ret)]
fn latest_queried_task_id(&self) -> Option<TaskItemId> {
tracing::trace!("getting latest queried task item id");
self.get_task_item_id(|data| data.latest_queried_task_item_id)
}

#[tracing::instrument(skip(self), err)]
fn set_latest_queried_task_id(&self, task_item_id: TaskItemId) -> Result<(), Self::Err> {
tracing::info!("updating latest queried task item id");
self.set_task_item_id(&task_item_id, |data, value| {
data.latest_queried_task_item_id = value;
})
}

#[tracing::instrument(skip(self), level = "trace", ret)]
fn latest_processed_task_id(&self) -> Option<TaskItemId> {
tracing::trace!("getting latest processed task item id");
self.get_task_item_id(|data| data.latest_processed_task_item_id)
}

#[tracing::instrument(skip(self), err)]
fn set_latest_processed_task_id(&self, task_item_id: TaskItemId) -> Result<(), Self::Err> {
tracing::info!("updating latest processed task item id");
self.set_task_item_id(&task_item_id, |data, value| {
data.latest_processed_task_item_id = value;
})
}
}
1 change: 1 addition & 0 deletions crates/relayer-amplifier-api-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde.workspace = true
relayer-engine.workspace = true
tokio-stream.workspace = true
common-serde-utils.workspace = true
relayer-amplifier-state.workspace = true

[lints]
workspace = true
Loading

0 comments on commit 0463033

Please sign in to comment.