Skip to content

Commit

Permalink
Merge pull request #26 from firstbatchxyz/erhant/fix-timeouts
Browse files Browse the repository at this point in the history
added timeouts, add arweave workflow parse test
  • Loading branch information
erhant authored Dec 6, 2024
2 parents 8edc2df + 895ab43 commit a9246a8
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "dkn-oracle"
description = "Dria Knowledge Network Oracle Node"
version = "0.1.9"
version = "0.1.10"
edition = "2021"
license = "Apache-2.0"
readme = "README.md"
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ You can terminate the application from the terminal as usual (e.g. CTRL+C) to qu

#### Using Arweave

To save from gas fees, an Oracle node can upload its response to Arweave and then store the transaction id of that upload to the contract instead. This is differentiated by looking at the response, and see that it is exactly 64 characters
To save from gas fees, an Oracle node can upload its response to Arweave and then store the transaction id of that upload to the contract instead. This is differentiated by looking at the response, and see that it is exactly 64 hexadecimal characters. It is then decoded from hex and encoded to `base64url` format, which can then be used to access the data at `https//arweave.net/{txid-here}`. This **requires** an Arweave wallet.

Following the same logic, the Oracle node can read task inputs from Arweave as well. This **does not require** an Arweave a wallet.

### Viewing Tasks

Expand Down
4 changes: 3 additions & 1 deletion misc/arweave.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ const inputDecoded = Buffer.from(input, "hex").toString();
const arweaveTxid = Buffer.from(inputDecoded, "hex").toString("base64url");

// download the actual response from Arweave
const res = await fetch(`https://arweave.net/${arweaveTxid}`);
const url = `https://arweave.net/${arweaveTxid}`;
console.log(url);
const res = await fetch(url);
console.log(await res.text());
23 changes: 19 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::str::FromStr;

use crate::{commands, contracts::OracleKind, DriaOracle, DriaOracleConfig};
use alloy::{
eips::BlockNumberOrTag,
Expand All @@ -24,6 +26,18 @@ fn parse_secret_key(value: &str) -> Result<B256> {
B256::from_hex(value).map_err(Into::into)
}

/// `value parser` to parse a `str` to `BlockNumberOrTag`
/// where if it can be parsed as `u64`, we call `BlockNumberOrTag::from_u64`
/// otherwise we call `BlockNumberOrTag::from_str`.
fn parse_block_number_or_tag(value: &str) -> Result<BlockNumberOrTag> {
match value.parse::<u64>() {
// parse block no from its decimal representation
Ok(block_number) => Ok(BlockNumberOrTag::from(block_number)),
// parse block no from hex, or parse its tag
Err(_) => BlockNumberOrTag::from_str(value).map_err(Into::into),
}
}

// https://docs.rs/clap/latest/clap/_derive/index.html#arg-attributes
#[derive(Subcommand)]
enum Commands {
Expand All @@ -49,21 +63,22 @@ enum Commands {
Start {
#[arg(
long,
help = "Starting block number to listen for, defaults to 'latest'."
help = "Starting block number to listen for, defaults to 'latest'.",
value_parser = parse_block_number_or_tag
)]
from: Option<BlockNumberOrTag>,
#[arg(help = "The oracle kinds to handle tasks as.", required = false)]
kinds: Vec<OracleKind>,
#[arg(short, long = "model", help = "The models to serve.", required = true, value_parser=parse_model)]
#[arg(short, long = "model", help = "The models to serve.", required = true, value_parser = parse_model)]
models: Vec<Model>,
},
/// View status of a given task.
View { task_id: U256 },
/// View tasks between specific blocks.
Tasks {
#[arg(long, help = "Starting block number, defaults to 'earliest'.")]
#[arg(long, help = "Starting block number, defaults to 'earliest'.", value_parser = parse_block_number_or_tag)]
from: Option<BlockNumberOrTag>,
#[arg(long, help = "Ending block number, defaults to 'latest'.")]
#[arg(long, help = "Ending block number, defaults to 'latest'.", value_parser = parse_block_number_or_tag)]
to: Option<BlockNumberOrTag>,
},
/// Request a task.
Expand Down
12 changes: 10 additions & 2 deletions src/commands/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,18 @@ pub async fn view_task_events(
from_block: impl Into<BlockNumberOrTag> + Clone,
to_block: impl Into<BlockNumberOrTag> + Clone,
) -> Result<()> {
let from_block: BlockNumberOrTag = from_block.clone().into();
let to_block: BlockNumberOrTag = to_block.clone().into();
log::info!(
"Viewing task ids & statuses between blocks: {} - {}",
from_block.clone().into(),
to_block.clone().into()
from_block
.as_number()
.map(|n| n.to_string())
.unwrap_or(from_block.to_string()),
to_block
.as_number()
.map(|n| n.to_string())
.unwrap_or(to_block.to_string())
);

let task_events = node.get_tasks_in_range(from_block, to_block).await?;
Expand Down
32 changes: 12 additions & 20 deletions src/compute/workflows/requests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ impl Request {

#[cfg(test)]
mod tests {
use alloy::hex::FromHex;

use super::*;

// only implemented for testing purposes
Expand Down Expand Up @@ -257,25 +259,15 @@ mod tests {
}

#[tokio::test]
#[ignore = "run this manually with GPT4oMini vs GPT4o"]
async fn test_erroneous() {
dotenvy::dotenv().unwrap();
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Debug)
.is_test(true)
.try_init();

// this looks like a workflow, but its not parseable so it should give an errror
// let input = "{\"config\":{\"max_steps\":50,\"max_time\":200,\"tools\":[\"ALL\"]},\"external_memory\":{\"backstory\":\"dark hacker\\nlives in the basement of his parents' house\",\"objective\":\"Hacking systems\",\"behaviour\":\"closed but strong\",\"state\":\"\",\"inventory\":[\"Empty Inventory\"]},\"tasks\":[{\"id\":\"simulate\",\"name\":\"State\",\"description\":\"Simulates from the given state to obtain a new state with respect to the given inputs.\",\"prompt\":\"You are a sophisticated 317-dimensional alien world simulator capable of simulating any fictional or non-fictional world with excellent detail. Your task is to simulate one day in the life of a character based on the provided inputs, taking into account every given detail to accurately mimic the created world.\\n\\n---------------------\\n\\nYou just woke up to a new day. When you look at mirror as you wake up, you reflect on yourself and who you are. You are:\\n<backstory>\\n{{backstory}}\\n</backstory>\\n\\nYou remember vividly what drove you in your life. You feel a strong urge to:\\n<objective>\\n{{objective}}\\n</objective>\\n\\n\\nTo be strong and coherent, you repeat out loud how you behave in front of the mirror.\\n<behaviour>\\n{{behaviour}}\\n</behaviour>\\n\\nAs you recall who you are, what you do and your drive is, you write down to a notebook your current progress with your goal:\\n<current_state>\\n{{state}}\\n</current_state>\\n\\nYou look through and see the items in your inventory.\\n<inventory>\\n{{inventory}}\\n</inventory>\\n\\nFirst, an omnipotent being watches you through out the day outlining what you've been through today within your world in <observe> tags. This being is beyond time and space can understand slightest intentions also the complex infinite parameter world around you.\\n\\nYou live another day... It's been a long day and you write down your journal what you've achieved so far today, and what is left with your ambitions. It's only been a day, so you know that you can achieve as much that is possible within a day. \\n\\nWrite this between <journal> tags.\\nStart now:\\n\",\"inputs\":[{\"name\":\"backstory\",\"value\":{\"type\":\"read\",\"key\":\"backstory\"},\"required\":true},{\"name\":\"state\",\"value\":{\"type\":\"read\",\"key\":\"state\"},\"required\":true},{\"name\":\"inventory\",\"value\":{\"type\":\"get_all\",\"key\":\"inventory\"},\"required\":true},{\"name\":\"behaviour\",\"value\":{\"type\":\"read\",\"key\":\"behaviour\"},\"required\":true},{\"name\":\"objective\",\"value\":{\"type\":\"read\",\"key\":\"objective\"},\"required\":true}],\"operator\":\"generation\",\"outputs\":[{\"type\":\"write\",\"key\":\"new_state\",\"value\":\"__result\"}]},{\"id\":\"_end\",\"name\":\"Task\",\"description\":\"Task Description\",\"prompt\":\"\",\"inputs\":[],\"operator\":\"end\",\"outputs\":[]}],\"steps\":[{\"source\":\"simulate\",\"target\":\"_end\"}],\"return_value\":{\"input\":{\"type\":\"read\",\"key\":\"new_state\"},\"to_json\":false}}";
let input = Bytes::from_static(&hex_literal::hex!("36623630613364623161396663353163313532383663396539393664363531626633306535626438363730386262396134636339633863636632393236623266"));

let mut request = Request::try_parse_bytes(&input).await.unwrap();
// this is a wrong Workflow object, so instead of being parsed as Request::Workflow it is parsed as Request::String!
// small models like GPT4oMini will not be able to handle this, and will output mumbo-jumbo at random times, sometimes will return the input itself
// Gpt4o will be able to handle this, it actually understands the task

let output = request.execute(Model::GPT4o, None).await.unwrap();

println!("Output:\n{}", output);
async fn test_arweave_workflow_parser() {
// task 21402 input
// 0x30306234343365613266393739626263353263613565363131376534646366353634366662316365343265663566643363643564646638373533643538323463
let input_bytes = Bytes::from_hex("30306234343365613266393739626263353263613565363131376534646366353634366662316365343265663566643363643564646638373533643538323463").unwrap();
let workflow = Request::try_parse_bytes(&input_bytes).await.unwrap();
if let Request::Workflow(_) = workflow {
/* do nothing */
} else {
panic!("Expected workflow, got something else");
}
}
}
20 changes: 17 additions & 3 deletions src/configurations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use eyre::{Context, Result};
use std::env;

/// Configuration for the Dria Oracle.
///
/// Stores the `EthereumWallet` instance along with the used RPC url.
#[derive(Debug, Clone)]
pub struct DriaOracleConfig {
/// Wallet for the oracle.
pub wallet: EthereumWallet,
/// RPC URL for the oracle, decides the connected chain.
pub rpc_url: Url,
/// Optional transaction timeout, is useful to avoid getting stuck at `get_receipt()` when making a transaction.
pub tx_timeout: Option<std::time::Duration>,
}

impl Default for DriaOracleConfig {
Expand All @@ -27,7 +29,19 @@ impl DriaOracleConfig {
PrivateKeySigner::from_bytes(secret_key).wrap_err("Could not parse private key")?;
let wallet = EthereumWallet::from(signer);

Ok(Self { wallet, rpc_url })
Ok(Self {
wallet,
rpc_url,
tx_timeout: None,
})
}

/// Change the transaction timeout.
/// This will make transaction wait for the given duration before timing out,
/// otherwise the node may get stuck waiting for a lost transaction.
pub fn with_tx_timeout(mut self, tx_timeout: std::time::Duration) -> Self {
self.tx_timeout = Some(tx_timeout);
self
}

/// Creates the config from the environment variables.
Expand Down
1 change: 1 addition & 0 deletions src/data/arweave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl OracleExternalData for Arweave {
let b64_key = Self::hex_to_base64(key.as_str())?;

let url = self.base_url.join(&b64_key)?;
log::debug!("Fetching from Arweave: {}", url);
let response = self
.client
.get(url)
Expand Down
15 changes: 12 additions & 3 deletions src/node/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ impl DriaOracle {
.wrap_err("could not request task")?;

log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx.get_receipt().await?;
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
}

Expand Down Expand Up @@ -91,7 +94,10 @@ impl DriaOracle {
let tx = req.send().await.map_err(contract_error_report)?;

log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx.get_receipt().await?;
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
}

Expand All @@ -108,7 +114,10 @@ impl DriaOracle {
let tx = req.send().await.map_err(contract_error_report)?;

log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx.get_receipt().await?;
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
}

Expand Down
10 changes: 8 additions & 2 deletions src/node/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ impl DriaOracle {
.wrap_err(eyre!("Could not register."))?;

log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx.get_receipt().await?;
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
}

Expand All @@ -32,7 +35,10 @@ impl DriaOracle {
.wrap_err("could not unregister")?;

log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx.get_receipt().await?;
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
}

Expand Down
10 changes: 8 additions & 2 deletions src/node/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ impl DriaOracle {
let tx = req.send().await.map_err(contract_error_report)?;

log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx.get_receipt().await?;
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
}

Expand All @@ -48,7 +51,10 @@ impl DriaOracle {
.wrap_err("could not approve tokens")?;

log::info!("Hash: {:?}", tx.tx_hash());
let receipt = tx.get_receipt().await?;
let receipt = tx
.with_timeout(self.config.tx_timeout)
.get_receipt()
.await?;
Ok(receipt)
}

Expand Down

0 comments on commit a9246a8

Please sign in to comment.