From ca05800b97879b4b722abe2e72472785cf44b144 Mon Sep 17 00:00:00 2001 From: tilacog Date: Sat, 21 Dec 2024 18:03:12 -0300 Subject: [PATCH] feat: implement message payload methods + chunked writes --- .../src/component.rs | 205 +++++++++++++++--- 1 file changed, 169 insertions(+), 36 deletions(-) diff --git a/crates/solana-gateway-task-processor/src/component.rs b/crates/solana-gateway-task-processor/src/component.rs index abaadcd..28d6ebf 100644 --- a/crates/solana-gateway-task-processor/src/component.rs +++ b/crates/solana-gateway-task-processor/src/component.rs @@ -341,8 +341,14 @@ async fn execute_task( axelar_solana_gateway::get_incoming_message_pda(&command_id); // Upload the message payload to a Gateway-owned PDA account and get its address back. - let gateway_message_payload_pda = - message_payload::upload(keypair, metadata.gateway_root_pda, &message, &payload).await?; + let gateway_message_payload_pda = message_payload::upload( + solana_rpc_client, + keypair, + metadata.gateway_root_pda, + &message, + &payload, + ) + .await?; // For compatibility reasons with the rest of the Axelar protocol we need add custom handling // for ITS & Governance programs @@ -380,7 +386,13 @@ async fn execute_task( } // Close the MessagePaynload PDA account to reclaim funds - message_payload::close(keypair, metadata.gateway_root_pda, &message).await?; + message_payload::close( + solana_rpc_client, + keypair, + metadata.gateway_root_pda, + &message, + ) + .await?; Ok(()) } @@ -552,53 +564,83 @@ async fn send_transaction( .await } +/// Message payload management module for Axelar Gateway integration. +/// +/// This module provides functionality to handle message payloads in the Solana blockchain, +/// including initialization, writing, committing, and closing of message payload accounts. mod message_payload { + use axelar_solana_encoding::types::messages::Message; use axelar_solana_gateway::state::incoming_message::command_id; use eyre::Context as _; - use solana_sdk::instruction::Instruction; + use futures::stream::FuturesUnordered; + use futures::StreamExt as _; + use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; use solana_sdk::signer::Signer as _; - // TODO: implement transaction handling logic. - #[expect(clippy::todo, reason = "wip")] - #[expect(clippy::unused_async, reason = "wip")] - async fn send_tx(_ixs: &[Instruction]) -> eyre::Result<()> { - todo!() - } + use super::send_transaction; - /// Upload a message payload to the PDA account + /// Maximum size for payload chunks in bytes. + // TODO: we should either fine tune this or make this configurable + const CHUNK_SIZE: usize = 500; + + /// Handles the upload of a message payload to a Program Derived Address (PDA) account. + /// + /// This function involves three main steps: + /// 1. Initialize the payload account + /// 2. Write the payload data + /// 3. Commit the payload + /// + /// Make sure to close the account afterward to recover the allocated funds. pub(super) async fn upload( - payer: &Keypair, + solana_rpc_client: &RpcClient, + keypair: &Keypair, gateway_root_pda: Pubkey, message: &Message, payload: &[u8], ) -> eyre::Result { let msg_command_id = message_to_command_id(message); - initialize(payer, gateway_root_pda, msg_command_id, payload).await?; - write(payer, gateway_root_pda, msg_command_id, payload).await?; - commit(payer, gateway_root_pda, msg_command_id).await?; + initialize( + solana_rpc_client, + keypair, + gateway_root_pda, + msg_command_id, + payload, + ) + .await?; + write( + solana_rpc_client, + keypair, + gateway_root_pda, + msg_command_id, + payload, + ) + .await?; + commit(solana_rpc_client, keypair, gateway_root_pda, msg_command_id).await?; let (message_payload_pda, _bump) = axelar_solana_gateway::find_message_payload_pda( gateway_root_pda, message_to_command_id(message), - payer.pubkey(), + keypair.pubkey(), ); Ok(message_payload_pda) } + /// Initializes a new message payload account. async fn initialize( - payer: &Keypair, + solana_rpc_client: &RpcClient, + keypair: &Keypair, gateway_root_pda: Pubkey, command_id: [u8; 32], payload: &[u8], ) -> eyre::Result<()> { let ix = axelar_solana_gateway::instructions::initialize_message_payload( gateway_root_pda, - payer.pubkey(), + keypair.pubkey(), command_id, payload .len() @@ -606,61 +648,152 @@ mod message_payload { .context("Unexpected u64 overflow in buffer size")?, ) .context("failed to construct an instruction to initialize the message payload pda")?; - send_tx(&[ix]).await?; + send_transaction(solana_rpc_client, keypair, ix).await?; Ok(()) } + /// Writes payload data to the initialized account in chunks. + /// + /// The function splits the payload into manageable chunks and writes them + /// concurrently using a [`FuturesUnordered`] collection. async fn write( - payer: &Keypair, + solana_rpc_client: &RpcClient, + keypair: &Keypair, gateway_root_pda: Pubkey, command_id: [u8; 32], payload: &[u8], ) -> eyre::Result<()> { - let ix = axelar_solana_gateway::instructions::write_message_payload( - gateway_root_pda, - payer.pubkey(), - command_id, - payload, - 0, - ) - .context("failed to construct an instruction to write to the message payload pda")?; - send_tx(&[ix]).await?; + let mut futures = FuturesUnordered::new(); + for ChunkWithOffset { bytes, offset } in chunks_with_offset(payload, CHUNK_SIZE) { + let ix = axelar_solana_gateway::instructions::write_message_payload( + gateway_root_pda, + keypair.pubkey(), + command_id, + bytes, + offset, + ) + .context("failed to construct an instruction to write to the message payload pda")?; + let future = async move { + let tx = send_transaction(solana_rpc_client, keypair, ix).await; + (offset, tx) + }; + futures.push(future); + } + + while let Some((offset, tx)) = futures.next().await { + tx.with_context(|| format!("failed to write message payload at offset {offset}"))?; + } + Ok(()) } + /// Commits the message payload, finalizing the upload process. async fn commit( - payer: &Keypair, + solana_rpc_client: &RpcClient, + keypair: &Keypair, gateway_root_pda: Pubkey, command_id: [u8; 32], ) -> eyre::Result<()> { let ix = axelar_solana_gateway::instructions::commit_message_payload( gateway_root_pda, - payer.pubkey(), + keypair.pubkey(), command_id, ) .context("failed to construct an instruction to commit the message payload pda")?; - send_tx(&[ix]).await?; + send_transaction(solana_rpc_client, keypair, ix).await?; Ok(()) } + /// Closes the message payload account and reclaims its rent. pub(super) async fn close( - payer: &Keypair, + solana_rpc_client: &RpcClient, + keypair: &Keypair, gateway_root_pda: Pubkey, message: &Message, ) -> eyre::Result<()> { let msg_command_id = message_to_command_id(message); let ix = axelar_solana_gateway::instructions::close_message_payload( gateway_root_pda, - payer.pubkey(), + keypair.pubkey(), msg_command_id, ) .context("failed to construct an instruction to close the message payload pda")?; - send_tx(&[ix]).await?; + send_transaction(solana_rpc_client, keypair, ix).await?; Ok(()) } - /// Helper fn to produce a command id from a message. + /// Helper function to generate a command ID from a message. fn message_to_command_id(message: &Message) -> [u8; 32] { command_id(&message.cc_id.chain, &message.cc_id.id) } + + /// Represents a chunk of data with its offset in the original data slice. + #[cfg_attr(test, derive(Debug, Clone, Eq, PartialEq))] + struct ChunkWithOffset<'a> { + /// The actual chunk of data + bytes: &'a [u8], + /// Offset position in the original data + offset: usize, + } + + /// Creates an iterator that yields fixed-size chunks with their offsets. + fn chunks_with_offset( + data: &[u8], + chunk_size: usize, + ) -> impl Iterator> + '_ { + data.chunks(chunk_size) + .enumerate() + .map(move |(index, chunk)| ChunkWithOffset { + bytes: chunk, + offset: index.saturating_mul(chunk_size), + }) + } + + #[cfg(test)] + mod tests { + use super::*; + + #[test] + fn test_chunks_with_offset() { + let data = b"12345678"; + let chunks: Vec<_> = chunks_with_offset(data, 3).collect(); + + assert_eq!( + chunks, + vec![ + ChunkWithOffset { + bytes: b"123", + offset: 0 + }, + ChunkWithOffset { + bytes: b"456", + offset: 3 + }, + ChunkWithOffset { + bytes: b"78", + offset: 6 + }, + ] + ); + } + + #[test] + fn test_empty_input() { + let data = b""; + assert!(chunks_with_offset(data, 3).next().is_none()); + } + + #[test] + fn test_chunk_size_larger_than_input() { + let data = b"123"; + let chunks: Vec<_> = chunks_with_offset(data, 5).collect(); + assert_eq!( + chunks, + vec![ChunkWithOffset { + bytes: b"123", + offset: 0 + },] + ); + } + } }