From 895ab43a6f045cf7261a9f4483c6000a1a087553 Mon Sep 17 00:00:00 2001 From: erhant Date: Fri, 6 Dec 2024 21:59:36 +0300 Subject: [PATCH] added timeouts, add arweave workflow parse test --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 4 +++- misc/arweave.js | 4 +++- src/cli.rs | 23 +++++++++++++++---- src/commands/coordinator.rs | 12 ++++++++-- src/compute/workflows/requests/mod.rs | 32 ++++++++++----------------- src/configurations/mod.rs | 20 ++++++++++++++--- src/data/arweave.rs | 1 + src/node/coordinator.rs | 15 ++++++++++--- src/node/registry.rs | 10 +++++++-- src/node/token.rs | 10 +++++++-- 12 files changed, 95 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 480b641..1e7931f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1849,7 +1849,7 @@ dependencies = [ [[package]] name = "dkn-oracle" -version = "0.1.9" +version = "0.1.10" dependencies = [ "alloy", "alloy-chains", diff --git a/Cargo.toml b/Cargo.toml index 74862a8..fb6e801 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dkn-oracle" description = "Dria Knowledge Network Oracle Node" -version = "0.1.9" +version = "0.1.10" edition = "2021" license = "Apache-2.0" readme = "README.md" diff --git a/README.md b/README.md index 3f1c7b3..2606143 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,9 @@ You can terminate the application from the terminal as usual (e.g. CTRL+C) to qu #### Using Arweave -To save from gas fees, an Oracle node can upload its response to Arweave and then store the transaction id of that upload to the contract instead. This is differentiated by looking at the response, and see that it is exactly 64 characters +To save from gas fees, an Oracle node can upload its response to Arweave and then store the transaction id of that upload to the contract instead. This is differentiated by looking at the response, and see that it is exactly 64 hexadecimal characters. It is then decoded from hex and encoded to `base64url` format, which can then be used to access the data at `https//arweave.net/{txid-here}`. This **requires** an Arweave wallet. + +Following the same logic, the Oracle node can read task inputs from Arweave as well. This **does not require** an Arweave a wallet. ### Viewing Tasks diff --git a/misc/arweave.js b/misc/arweave.js index 4374521..20e721c 100644 --- a/misc/arweave.js +++ b/misc/arweave.js @@ -27,5 +27,7 @@ const inputDecoded = Buffer.from(input, "hex").toString(); const arweaveTxid = Buffer.from(inputDecoded, "hex").toString("base64url"); // download the actual response from Arweave -const res = await fetch(`https://arweave.net/${arweaveTxid}`); +const url = `https://arweave.net/${arweaveTxid}`; +console.log(url); +const res = await fetch(url); console.log(await res.text()); diff --git a/src/cli.rs b/src/cli.rs index 2583bfa..e2a1d78 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use crate::{commands, contracts::OracleKind, DriaOracle, DriaOracleConfig}; use alloy::{ eips::BlockNumberOrTag, @@ -24,6 +26,18 @@ fn parse_secret_key(value: &str) -> Result { B256::from_hex(value).map_err(Into::into) } +/// `value parser` to parse a `str` to `BlockNumberOrTag` +/// where if it can be parsed as `u64`, we call `BlockNumberOrTag::from_u64` +/// otherwise we call `BlockNumberOrTag::from_str`. +fn parse_block_number_or_tag(value: &str) -> Result { + match value.parse::() { + // parse block no from its decimal representation + Ok(block_number) => Ok(BlockNumberOrTag::from(block_number)), + // parse block no from hex, or parse its tag + Err(_) => BlockNumberOrTag::from_str(value).map_err(Into::into), + } +} + // https://docs.rs/clap/latest/clap/_derive/index.html#arg-attributes #[derive(Subcommand)] enum Commands { @@ -49,21 +63,22 @@ enum Commands { Start { #[arg( long, - help = "Starting block number to listen for, defaults to 'latest'." + help = "Starting block number to listen for, defaults to 'latest'.", + value_parser = parse_block_number_or_tag )] from: Option, #[arg(help = "The oracle kinds to handle tasks as.", required = false)] kinds: Vec, - #[arg(short, long = "model", help = "The models to serve.", required = true, value_parser=parse_model)] + #[arg(short, long = "model", help = "The models to serve.", required = true, value_parser = parse_model)] models: Vec, }, /// View status of a given task. View { task_id: U256 }, /// View tasks between specific blocks. Tasks { - #[arg(long, help = "Starting block number, defaults to 'earliest'.")] + #[arg(long, help = "Starting block number, defaults to 'earliest'.", value_parser = parse_block_number_or_tag)] from: Option, - #[arg(long, help = "Ending block number, defaults to 'latest'.")] + #[arg(long, help = "Ending block number, defaults to 'latest'.", value_parser = parse_block_number_or_tag)] to: Option, }, /// Request a task. diff --git a/src/commands/coordinator.rs b/src/commands/coordinator.rs index a6c7041..7f78133 100644 --- a/src/commands/coordinator.rs +++ b/src/commands/coordinator.rs @@ -145,10 +145,18 @@ pub async fn view_task_events( from_block: impl Into + Clone, to_block: impl Into + Clone, ) -> Result<()> { + let from_block: BlockNumberOrTag = from_block.clone().into(); + let to_block: BlockNumberOrTag = to_block.clone().into(); log::info!( "Viewing task ids & statuses between blocks: {} - {}", - from_block.clone().into(), - to_block.clone().into() + from_block + .as_number() + .map(|n| n.to_string()) + .unwrap_or(from_block.to_string()), + to_block + .as_number() + .map(|n| n.to_string()) + .unwrap_or(to_block.to_string()) ); let task_events = node.get_tasks_in_range(from_block, to_block).await?; diff --git a/src/compute/workflows/requests/mod.rs b/src/compute/workflows/requests/mod.rs index da7f80d..70abe7b 100644 --- a/src/compute/workflows/requests/mod.rs +++ b/src/compute/workflows/requests/mod.rs @@ -161,6 +161,8 @@ impl Request { #[cfg(test)] mod tests { + use alloy::hex::FromHex; + use super::*; // only implemented for testing purposes @@ -257,25 +259,15 @@ mod tests { } #[tokio::test] - #[ignore = "run this manually with GPT4oMini vs GPT4o"] - async fn test_erroneous() { - dotenvy::dotenv().unwrap(); - let _ = env_logger::builder() - .filter_level(log::LevelFilter::Debug) - .is_test(true) - .try_init(); - - // this looks like a workflow, but its not parseable so it should give an errror - // let input = "{\"config\":{\"max_steps\":50,\"max_time\":200,\"tools\":[\"ALL\"]},\"external_memory\":{\"backstory\":\"dark hacker\\nlives in the basement of his parents' house\",\"objective\":\"Hacking systems\",\"behaviour\":\"closed but strong\",\"state\":\"\",\"inventory\":[\"Empty Inventory\"]},\"tasks\":[{\"id\":\"simulate\",\"name\":\"State\",\"description\":\"Simulates from the given state to obtain a new state with respect to the given inputs.\",\"prompt\":\"You are a sophisticated 317-dimensional alien world simulator capable of simulating any fictional or non-fictional world with excellent detail. Your task is to simulate one day in the life of a character based on the provided inputs, taking into account every given detail to accurately mimic the created world.\\n\\n---------------------\\n\\nYou just woke up to a new day. When you look at mirror as you wake up, you reflect on yourself and who you are. You are:\\n\\n{{backstory}}\\n\\n\\nYou remember vividly what drove you in your life. You feel a strong urge to:\\n\\n{{objective}}\\n\\n\\n\\nTo be strong and coherent, you repeat out loud how you behave in front of the mirror.\\n\\n{{behaviour}}\\n\\n\\nAs you recall who you are, what you do and your drive is, you write down to a notebook your current progress with your goal:\\n\\n{{state}}\\n\\n\\nYou look through and see the items in your inventory.\\n\\n{{inventory}}\\n\\n\\nFirst, an omnipotent being watches you through out the day outlining what you've been through today within your world in tags. This being is beyond time and space can understand slightest intentions also the complex infinite parameter world around you.\\n\\nYou live another day... It's been a long day and you write down your journal what you've achieved so far today, and what is left with your ambitions. It's only been a day, so you know that you can achieve as much that is possible within a day. \\n\\nWrite this between tags.\\nStart now:\\n\",\"inputs\":[{\"name\":\"backstory\",\"value\":{\"type\":\"read\",\"key\":\"backstory\"},\"required\":true},{\"name\":\"state\",\"value\":{\"type\":\"read\",\"key\":\"state\"},\"required\":true},{\"name\":\"inventory\",\"value\":{\"type\":\"get_all\",\"key\":\"inventory\"},\"required\":true},{\"name\":\"behaviour\",\"value\":{\"type\":\"read\",\"key\":\"behaviour\"},\"required\":true},{\"name\":\"objective\",\"value\":{\"type\":\"read\",\"key\":\"objective\"},\"required\":true}],\"operator\":\"generation\",\"outputs\":[{\"type\":\"write\",\"key\":\"new_state\",\"value\":\"__result\"}]},{\"id\":\"_end\",\"name\":\"Task\",\"description\":\"Task Description\",\"prompt\":\"\",\"inputs\":[],\"operator\":\"end\",\"outputs\":[]}],\"steps\":[{\"source\":\"simulate\",\"target\":\"_end\"}],\"return_value\":{\"input\":{\"type\":\"read\",\"key\":\"new_state\"},\"to_json\":false}}"; - let input = Bytes::from_static(&hex_literal::hex!("36623630613364623161396663353163313532383663396539393664363531626633306535626438363730386262396134636339633863636632393236623266")); - - let mut request = Request::try_parse_bytes(&input).await.unwrap(); - // this is a wrong Workflow object, so instead of being parsed as Request::Workflow it is parsed as Request::String! - // small models like GPT4oMini will not be able to handle this, and will output mumbo-jumbo at random times, sometimes will return the input itself - // Gpt4o will be able to handle this, it actually understands the task - - let output = request.execute(Model::GPT4o, None).await.unwrap(); - - println!("Output:\n{}", output); + async fn test_arweave_workflow_parser() { + // task 21402 input + // 0x30306234343365613266393739626263353263613565363131376534646366353634366662316365343265663566643363643564646638373533643538323463 + let input_bytes = Bytes::from_hex("30306234343365613266393739626263353263613565363131376534646366353634366662316365343265663566643363643564646638373533643538323463").unwrap(); + let workflow = Request::try_parse_bytes(&input_bytes).await.unwrap(); + if let Request::Workflow(_) = workflow { + /* do nothing */ + } else { + panic!("Expected workflow, got something else"); + } } } diff --git a/src/configurations/mod.rs b/src/configurations/mod.rs index c1a192f..4ffd3c6 100644 --- a/src/configurations/mod.rs +++ b/src/configurations/mod.rs @@ -7,12 +7,14 @@ use eyre::{Context, Result}; use std::env; /// Configuration for the Dria Oracle. -/// -/// Stores the `EthereumWallet` instance along with the used RPC url. #[derive(Debug, Clone)] pub struct DriaOracleConfig { + /// Wallet for the oracle. pub wallet: EthereumWallet, + /// RPC URL for the oracle, decides the connected chain. pub rpc_url: Url, + /// Optional transaction timeout, is useful to avoid getting stuck at `get_receipt()` when making a transaction. + pub tx_timeout: Option, } impl Default for DriaOracleConfig { @@ -27,7 +29,19 @@ impl DriaOracleConfig { PrivateKeySigner::from_bytes(secret_key).wrap_err("Could not parse private key")?; let wallet = EthereumWallet::from(signer); - Ok(Self { wallet, rpc_url }) + Ok(Self { + wallet, + rpc_url, + tx_timeout: None, + }) + } + + /// Change the transaction timeout. + /// This will make transaction wait for the given duration before timing out, + /// otherwise the node may get stuck waiting for a lost transaction. + pub fn with_tx_timeout(mut self, tx_timeout: std::time::Duration) -> Self { + self.tx_timeout = Some(tx_timeout); + self } /// Creates the config from the environment variables. diff --git a/src/data/arweave.rs b/src/data/arweave.rs index 44be83b..b8cef17 100644 --- a/src/data/arweave.rs +++ b/src/data/arweave.rs @@ -149,6 +149,7 @@ impl OracleExternalData for Arweave { let b64_key = Self::hex_to_base64(key.as_str())?; let url = self.base_url.join(&b64_key)?; + log::debug!("Fetching from Arweave: {}", url); let response = self .client .get(url) diff --git a/src/node/coordinator.rs b/src/node/coordinator.rs index 3a1c9d6..f51b3e7 100644 --- a/src/node/coordinator.rs +++ b/src/node/coordinator.rs @@ -38,7 +38,10 @@ impl DriaOracle { .wrap_err("could not request task")?; log::info!("Hash: {:?}", tx.tx_hash()); - let receipt = tx.get_receipt().await?; + let receipt = tx + .with_timeout(self.config.tx_timeout) + .get_receipt() + .await?; Ok(receipt) } @@ -91,7 +94,10 @@ impl DriaOracle { let tx = req.send().await.map_err(contract_error_report)?; log::info!("Hash: {:?}", tx.tx_hash()); - let receipt = tx.get_receipt().await?; + let receipt = tx + .with_timeout(self.config.tx_timeout) + .get_receipt() + .await?; Ok(receipt) } @@ -108,7 +114,10 @@ impl DriaOracle { let tx = req.send().await.map_err(contract_error_report)?; log::info!("Hash: {:?}", tx.tx_hash()); - let receipt = tx.get_receipt().await?; + let receipt = tx + .with_timeout(self.config.tx_timeout) + .get_receipt() + .await?; Ok(receipt) } diff --git a/src/node/registry.rs b/src/node/registry.rs index 0b8ad19..915320a 100644 --- a/src/node/registry.rs +++ b/src/node/registry.rs @@ -16,7 +16,10 @@ impl DriaOracle { .wrap_err(eyre!("Could not register."))?; log::info!("Hash: {:?}", tx.tx_hash()); - let receipt = tx.get_receipt().await?; + let receipt = tx + .with_timeout(self.config.tx_timeout) + .get_receipt() + .await?; Ok(receipt) } @@ -32,7 +35,10 @@ impl DriaOracle { .wrap_err("could not unregister")?; log::info!("Hash: {:?}", tx.tx_hash()); - let receipt = tx.get_receipt().await?; + let receipt = tx + .with_timeout(self.config.tx_timeout) + .get_receipt() + .await?; Ok(receipt) } diff --git a/src/node/token.rs b/src/node/token.rs index c5eda09..b2a4c3a 100644 --- a/src/node/token.rs +++ b/src/node/token.rs @@ -33,7 +33,10 @@ impl DriaOracle { let tx = req.send().await.map_err(contract_error_report)?; log::info!("Hash: {:?}", tx.tx_hash()); - let receipt = tx.get_receipt().await?; + let receipt = tx + .with_timeout(self.config.tx_timeout) + .get_receipt() + .await?; Ok(receipt) } @@ -48,7 +51,10 @@ impl DriaOracle { .wrap_err("could not approve tokens")?; log::info!("Hash: {:?}", tx.tx_hash()); - let receipt = tx.get_receipt().await?; + let receipt = tx + .with_timeout(self.config.tx_timeout) + .get_receipt() + .await?; Ok(receipt) }