Skip to content

Commit

Permalink
proof stored in dht
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Aug 12, 2024
1 parent 5c3b19b commit 6459cdd
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 15 deletions.
27 changes: 22 additions & 5 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tracing::{error, info};
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::hash;
use zetina_common::job::{Job, JobBid, JobData};
use zetina_common::job_witness::JobWitness;
use zetina_common::process::Process;
use zetina_peer::swarm::{
DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic,
Expand Down Expand Up @@ -41,6 +42,8 @@ impl Delegator {
>::new();
let mut job_hash_store =
HashMap::<kad::RecordKey, mpsc::Sender<(u64, PeerId)>>::new();
let mut proof_hash_store = HashMap::<kad::RecordKey, kad::RecordKey>::new();

loop {
tokio::select! {
Some(job_data) = delegate_rx.recv() => {
Expand All @@ -67,9 +70,12 @@ impl Delegator {
}
if message.topic == Topic::Delegation.into() {
match serde_json::from_slice::<DelegationMessage>(&message.data)? {
DelegationMessage::Finished(job_witness) => {
info!("Received finished job: {}", hex::encode(&job_witness.job_key));
events_tx.send((job_witness.job_key, DelegatorEvent::Finished(job_witness.proof)))?;
DelegationMessage::Finished(proof_key, job_key) => {
if job_hash_store.remove(&job_key).is_some() {
info!("Received finished job: {} proof key: {}", hex::encode(&job_key), hex::encode(&proof_key));
proof_hash_store.insert(proof_key.to_owned(), job_key);
kademlia_tx.send(KademliaMessage::GET(proof_key)).await?;
}
}
_ => {}
}
Expand All @@ -86,15 +92,26 @@ impl Delegator {
let (process, bid_tx) = BidQueue::run(key.to_owned());
job_bid_scheduler.push(process);
job_hash_store.insert(key, bid_tx);
}
},
kad::QueryResult::GetRecord(Ok(
kad::GetRecordOk::FoundRecord(kad::PeerRecord {
record: kad::Record { key, value, .. },
..
})
)) => {
if let Some ((proof_key, job_key)) = proof_hash_store.remove_entry(&key) {
info!("job {} proof with key: {} returned in DHT", hex::encode(&job_key), hex::encode(&proof_key));
let job_witness: JobWitness = serde_json::from_slice(&value)?;
events_tx.send((job_key, DelegatorEvent::Finished(job_witness.proof)))?;
}
},
_ => {}
}
}
_ => {}
}
}
Some(Ok((job_key, bids))) = job_bid_scheduler.next() => {
job_hash_store.remove(&job_key);
let bid = bids.first_key_value().unwrap();
let price = *bid.0;
let identity = *bid.1.first().unwrap();
Expand Down
28 changes: 20 additions & 8 deletions crates/executor/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use futures::{stream::FuturesUnordered, Stream};
use libp2p::{gossipsub, kad, PeerId};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::pin::Pin;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tokio_stream::StreamExt;
use tracing::{error, info};
use zetina_common::hash;
use zetina_common::job::Job;
use zetina_common::{
graceful_shutdown::shutdown_signal, job::JobBid, job_trace::JobTrace, job_witness::JobWitness,
Expand Down Expand Up @@ -44,6 +46,7 @@ impl Executor {
>::new();

let mut job_hash_store = HashSet::<kad::RecordKey>::new();
let mut proof_hash_store = HashMap::<kad::RecordKey, kad::RecordKey>::new();

loop {
tokio::select! {
Expand Down Expand Up @@ -88,11 +91,19 @@ impl Executor {
..
})
)) => {
if job_hash_store.contains(&key) {
if job_hash_store.remove(&key) {
let job: Job = serde_json::from_slice(&value)?;
info!("received delegation of job: {}", hex::encode(&key));
runner_scheduler.push(runner.run(job)?);
job_hash_store.remove(&key);
}
},
kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
if let Some ((proof_key, job_key)) = proof_hash_store.remove_entry(&key) {
info!("job {} proof with key: {} stored in DHT", hex::encode(&job_key), hex::encode(&proof_key));
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Finished(proof_key, job_key))?
}).await?;
}
}
_ => {}
Expand All @@ -106,11 +117,12 @@ impl Executor {
prover_scheduler.push(prover.run(job_trace)?);
},
Some(Ok(job_witness)) = prover_scheduler.next() => {
info!("Finished proving: {}", hex::encode(&job_witness.job_key));
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Finished(job_witness))?
}).await?;
let proof_key = kad::RecordKey::new(&hash!(job_witness).to_be_bytes());
info!("Finished proving job: {} proof key: {}", hex::encode(&job_witness.job_key), hex::encode(&proof_key));
proof_hash_store.insert(proof_key.to_owned(), job_witness.job_key.to_owned());
kademlia_tx.send(KademliaMessage::PUT(
(proof_key, serde_json::to_vec(&job_witness)?)
)).await?;
},
_ = shutdown_signal() => {
break
Expand Down
3 changes: 1 addition & 2 deletions crates/peer/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tokio::sync::mpsc;
use tracing::{debug, error, info};
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::job::{Job, JobBid};
use zetina_common::job_witness::JobWitness;

#[derive(NetworkBehaviour)]
pub struct PeerBehaviour {
Expand Down Expand Up @@ -83,7 +82,7 @@ pub enum MarketMessage {
#[derive(Debug, Serialize, Deserialize)]
pub enum DelegationMessage {
Delegate(JobBid),
Finished(JobWitness),
Finished(kad::RecordKey, kad::RecordKey),
}

impl SwarmRunner {
Expand Down

0 comments on commit 6459cdd

Please sign in to comment.