Skip to content

Commit

Permalink
Merge pull request #4 from iosis-tech/feat/job-handle
Browse files Browse the repository at this point in the history
Swarm behavior ( Job )
  • Loading branch information
Okm165 authored Apr 19, 2024
2 parents 3b3e93f + 4495292 commit 97db7cb
Show file tree
Hide file tree
Showing 19 changed files with 206 additions and 58 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ jobs:

- name: Setup Starknet Foundry
uses: foundry-rs/setup-snfoundry@v3

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.9'

- name: Provision Environment
run: python install.py

- name: Format code
run: cargo fmt --check
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ Cargo.lock

Scarb.lock
.snfoundry_cache/

stone-prover
17 changes: 15 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ members = [
"crates/delegator",
"crates/executor",
"crates/peer",
"crates/prover", "crates/runner",
"crates/prover",
"crates/runner",
]
exclude = []

Expand All @@ -21,14 +22,26 @@ license-file = "LICENSE"
[workspace.dependencies]
async-process = "2.2.0"
async-stream = "0.3.5"
bincode = "1.3"
cairo-felt = "0.9.1"
cairo-proof-parser = { git = "https://github.com/Okm165/cairo-proof-parser", rev = "97a04bbee07330311b38d6f4cecfed3acb237626" }
futures = "0.3.30"
futures-core = "0.3.30"
futures-util = "0.3.30"
hex = "0.4.3"
itertools = "0.12.1"
libp2p = { version = "0.53.2", features = ["secp256k1", "tokio","gossipsub","kad","mdns","noise","macros","tcp","yamux","quic"]}
libp2p = { version = "0.53.2", features = [
"secp256k1",
"tokio",
"gossipsub",
"kad",
"mdns",
"noise",
"macros",
"tcp",
"yamux",
"quic",
] }
libsecp256k1 = "0.7.1"
num-bigint = "0.4.4"
serde = "1.0.197"
Expand Down
3 changes: 2 additions & 1 deletion crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license-file.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bincode.workspace = true
cairo-felt.workspace = true
hex.workspace = true
libp2p.workspace = true
Expand All @@ -16,4 +17,4 @@ num-bigint.workspace = true
serde_json.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
thiserror.workspace = true
56 changes: 44 additions & 12 deletions crates/common/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,52 @@
use libsecp256k1::{PublicKey, Signature};
use crate::hash;
use libsecp256k1::{curve::Scalar, sign, PublicKey, SecretKey, Signature};
use std::{
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
};

use crate::hash;
/*
Job Object
This object represents a task requested by a delegator.
It contains metadata that allows the executor to decide if the task is attractive enough to run.
It includes a pie object that holds the task bytecode itself.
Additionally, the object holds the signature and public key of the delegator, enabling the executor to prove to the Registry that the task was intended by the delegator.
The Job object also includes the target registry where the delegator expects this proof to be verified.
*/

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Job {
pub reward: u32,
pub num_of_steps: u32,
pub cairo_pie: Vec<u8>,
pub public_key: PublicKey,
pub signature: Signature,
// below fields not bounded by signature
pub cpu_air_params: Vec<u8>, // needed for proving
pub cpu_air_prover_config: Vec<u8>, // needed for proving
pub reward: u32, // The reward offered for completing the task
pub num_of_steps: u32, // The number of steps expected to complete the task (executor ensures that this number is greater than or equal to the actual steps; in the future, the executor may charge a fee to the delegator if not met)
pub cairo_pie: Vec<u8>, // The task bytecode in compressed zip format, to conserve memory
pub registry_address: String, // The address of the registry contract where the delegator expects the proof to be verified
pub public_key: PublicKey, // The public key of the delegator, used in the bootloader stage to confirm authenticity of the Job<->Delegator relationship
pub signature: Signature, // The signature of the delegator, used in the bootloader stage to confirm authenticity of the Job<->Delegator relationship
}

impl Default for Job {
fn default() -> Self {
let secret_key = &SecretKey::default();
let public_key = PublicKey::from_secret_key(secret_key);
let (signature, _recovery_id) =
sign(&libsecp256k1::Message(Scalar([0, 0, 0, 0, 0, 0, 0, 0])), secret_key);
Self {
reward: 0,
num_of_steps: 0,
cairo_pie: vec![1, 2, 3],
public_key,
signature,
registry_address: "0x0".to_string(),
}
}
}

impl Hash for Job {
fn hash<H: Hasher>(&self, state: &mut H) {
self.reward.hash(state);
self.num_of_steps.hash(state);
self.cairo_pie.hash(state);
self.cpu_air_prover_config.hash(state);
self.cpu_air_params.hash(state);
self.registry_address.hash(state);
}
}

Expand All @@ -33,3 +55,13 @@ impl Display for Job {
write!(f, "{}", hex::encode(hash!(self).to_be_bytes()))
}
}

// impl Job {
// pub fn serialize_job(&self) -> Vec<u8> {
// bincode::serialize(self).unwrap()
// }

// pub fn deserialize_job(serialized_job: &[u8]) -> Self {
// bincode::deserialize(serialized_job).unwrap()
// }
// }
18 changes: 10 additions & 8 deletions crates/common/src/job_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use std::{
};
use tempfile::NamedTempFile;

/*
Job Trace Object
This object represents the output from the Cairo run process in proof mode.
It includes objects such as public input, private input, trace, and memory.
*/

#[derive(Debug)]
pub struct JobTrace {
pub air_public_input: NamedTempFile,
pub air_private_input: NamedTempFile,
pub memory: NamedTempFile, // this is not used directly but needs to live for air_private_input to be valid
pub trace: NamedTempFile, // this is not used directly but needs to live for air_private_input to be valid
pub cpu_air_prover_config: NamedTempFile,
pub cpu_air_params: NamedTempFile,
pub air_public_input: NamedTempFile, // Temporary file containing the public input
pub air_private_input: NamedTempFile, // Temporary file containing the private input; memory and trace files must exist for this to be valid
pub memory: NamedTempFile, // Temporary file containing memory data (required for air_private_input validity)
pub trace: NamedTempFile, // Temporary file containing trace data (required for air_private_input validity)
}

impl Hash for JobTrace {
Expand All @@ -21,8 +25,6 @@ impl Hash for JobTrace {
self.air_private_input.path().hash(state);
self.memory.path().hash(state);
self.trace.path().hash(state);
self.cpu_air_prover_config.path().hash(state);
self.cpu_air_params.path().hash(state);
}
}

Expand Down
9 changes: 8 additions & 1 deletion crates/common/src/job_witness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ use std::{
hash::{DefaultHasher, Hash, Hasher},
};

/*
Job Witness Object
This object represents the output from the proving process.
It holds a serialized proof as an array of Felt252 objects.
This serialized proof can be deserialized into a StarkProof object by the verifier to proceed with the verification of the statement.
*/

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct JobWitness {
pub data: Vec<Felt252>,
pub proof: Vec<Felt252>, // Serialized proof
}

impl Display for JobWitness {
Expand Down
6 changes: 4 additions & 2 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
tokio::select! {
Ok(Some(line)) = stdin.next_line() => {
send_topic_tx.send(line.as_bytes().to_vec()).await?;
Ok(Some(_)) = stdin.next_line() => {
// TODO: Turn this into a real job generation

send_topic_tx.send([1,2, 3].to_vec()).await?;
},
Some(event) = message_stream.next() => {
info!("{:?}", event);
Expand Down
23 changes: 22 additions & 1 deletion crates/executor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures_util::StreamExt;
use libp2p::gossipsub::Event;
use sharp_p2p_common::network::Network;
use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic};
use sharp_p2p_peer::registry::RegistryHandler;
Expand Down Expand Up @@ -40,7 +41,27 @@ async fn main() -> Result<(), Box<dyn Error>> {
send_topic_tx.send(line.as_bytes().to_vec()).await?;
},
Some(event) = message_stream.next() => {
info!("{:?}", event);
match event {
Event::Message { message, .. } => {
// Received a new-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::NewJob).into() {

info!("Received a new job: {:?}", message);
}
// Received a picked-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob).into() {

info!("Received a picked job: {:?}", message);
}
},
Event::Subscribed { peer_id, topic } => {
info!("{} subscribed to the topic {}", peer_id.to_string(), topic.to_string());
},
Event::Unsubscribed { peer_id, topic }=> {
info!("{} unsubscribed to the topic {}", peer_id.to_string(), topic.to_string());
},
_ => {}
}
},
Some(Ok(event_vec)) = event_stream.next() => {
info!("{:?}", event_vec);
Expand Down
9 changes: 9 additions & 0 deletions crates/prover/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// use std::process::Command;

fn main() {
// Check if stone-prover command is present
// Command::new("cpu_air_prover")
// .arg("--help")
// .output()
// .expect("Failed to execute cpu_air_prover command");
}
21 changes: 12 additions & 9 deletions crates/prover/src/stone_prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ impl ProverController for StoneProver {
async fn prove(&mut self, job_trace: JobTrace) -> Result<JobWitness, ProverControllerError> {
let mut out_file = NamedTempFile::new()?;

let cpu_air_prover_config = NamedTempFile::new()?; // TODO implement default config and getting info from integrity verifier
let cpu_air_params = NamedTempFile::new()?; // TODO implement default config and getting info from integrity verifier

let task = Command::new("cpu_air_prover")
.args(["--out_file", out_file.path().to_string_lossy().as_ref()])
.args([
Expand All @@ -39,9 +42,9 @@ impl ProverController for StoneProver {
])
.args([
"--cpu_air_prover_config",
job_trace.cpu_air_prover_config.path().to_string_lossy().as_ref(),
cpu_air_prover_config.path().to_string_lossy().as_ref(),
])
.args(["--cpu_air_params", job_trace.cpu_air_params.path().to_string_lossy().as_ref()])
.args(["--cpu_air_params", cpu_air_params.path().to_string_lossy().as_ref()])
.arg("--generate_annotations")
.spawn()?;

Expand Down Expand Up @@ -71,10 +74,10 @@ impl ProverController for StoneProver {
.await?;
trace!("task {} output {:?}", job_trace_hash, task_output);

let mut input = String::new();
out_file.read_to_string(&mut input)?;
let mut raw_proof = String::new();
out_file.read_to_string(&mut raw_proof)?;

let parsed_proof = cairo_proof_parser::parse(input)
let parsed_proof = cairo_proof_parser::parse(raw_proof)
.map_err(|e| ProverControllerError::ProofParseError(e.to_string()))?;

let config: VecFelt252 = serde_json::from_str(&parsed_proof.config.to_string())?;
Expand All @@ -84,18 +87,18 @@ impl ProverController for StoneProver {
serde_json::from_str(&parsed_proof.unsent_commitment.to_string())?;
let witness: VecFelt252 = serde_json::from_str(&parsed_proof.witness.to_string())?;

let data = chain!(
let proof = chain!(
config.into_iter(),
public_input.into_iter(),
unsent_commitment.into_iter(),
witness.into_iter()
)
.collect_vec();

Ok(JobWitness { data })
Ok(JobWitness { proof })
}

async fn terminate(&mut self, job_trace_hash: u64) -> Result<(), ProverControllerError> {
fn terminate(&mut self, job_trace_hash: u64) -> Result<(), ProverControllerError> {
self.tasks
.get_mut(&job_trace_hash)
.ok_or(ProverControllerError::TaskNotFound)?
Expand All @@ -104,7 +107,7 @@ impl ProverController for StoneProver {
Ok(())
}

async fn drop(mut self) -> Result<(), ProverControllerError> {
fn drop(mut self) -> Result<(), ProverControllerError> {
let keys: Vec<u64> = self.tasks.keys().cloned().collect();
for job_trace_hash in keys.iter() {
self.tasks
Expand Down
4 changes: 2 additions & 2 deletions crates/prover/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ pub trait Prover {

pub trait ProverController {
async fn prove(&mut self, job_trace: JobTrace) -> Result<JobWitness, ProverControllerError>;
async fn terminate(&mut self, job_trace_hash: u64) -> Result<(), ProverControllerError>;
async fn drop(self) -> Result<(), ProverControllerError>;
fn terminate(&mut self, job_trace_hash: u64) -> Result<(), ProverControllerError>;
fn drop(self) -> Result<(), ProverControllerError>;
}
15 changes: 15 additions & 0 deletions crates/runner/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::process::Command;

fn main() {
// Check if cairo-run command is present
Command::new("cairo-run")
.arg("--version")
.output()
.expect("Failed to execute cairo-run command");

// Check if cairo-compile command is present
Command::new("cairo-compile")
.arg("--version")
.output()
.expect("Failed to execute cairo-compile command");
}
20 changes: 4 additions & 16 deletions crates/runner/src/cairo_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Runner for CairoRunner {
impl RunnerController for CairoRunner {
async fn run(&mut self, job: Job) -> Result<JobTrace, RunnerControllerError> {
let program = NamedTempFile::new()?;
let layout: &str = Layout::Recursive.into();
let layout: &str = Layout::RecursiveWithPoseidon.into();

let mut cairo_pie = NamedTempFile::new()?;
cairo_pie.write_all(&job.cairo_pie)?;
Expand Down Expand Up @@ -86,28 +86,16 @@ impl RunnerController for CairoRunner {
.await?;
trace!("task {} output {:?}", job_hash, task_output);

let mut cpu_air_params = NamedTempFile::new()?;
let mut cpu_air_prover_config = NamedTempFile::new()?;
cpu_air_params.write_all(&job.cpu_air_params)?;
cpu_air_prover_config.write_all(&job.cpu_air_prover_config)?;

Ok(JobTrace {
air_public_input,
air_private_input,
memory,
trace,
cpu_air_prover_config,
cpu_air_params,
})
Ok(JobTrace { air_public_input, air_private_input, memory, trace })
}

async fn terminate(&mut self, job_hash: u64) -> Result<(), RunnerControllerError> {
fn terminate(&mut self, job_hash: u64) -> Result<(), RunnerControllerError> {
self.tasks.get_mut(&job_hash).ok_or(RunnerControllerError::TaskNotFound)?.start_kill()?;
trace!("task scheduled for termination {}", job_hash);
Ok(())
}

async fn drop(mut self) -> Result<(), RunnerControllerError> {
fn drop(mut self) -> Result<(), RunnerControllerError> {
let keys: Vec<u64> = self.tasks.keys().cloned().collect();
for job_hash in keys.iter() {
self.tasks
Expand Down
Loading

0 comments on commit 97db7cb

Please sign in to comment.