Skip to content

Commit

Permalink
Client Updates for farmer
Browse files Browse the repository at this point in the history
  • Loading branch information
luna committed Oct 19, 2023
1 parent 8b3f548 commit 00d9f95
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 31 deletions.
10 changes: 5 additions & 5 deletions clients/src/protocols/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,17 @@ pub struct ErrorResponse {
pub error_message: Option<String>,
}

pub fn get_current_authentication_token(timeout: u64) -> u64 {
pub fn get_current_authentication_token(timeout: u8) -> u64 {
let now: u64 = OffsetDateTime::now_utc().unix_timestamp() as u64;
now / 60 / timeout
now / 60 / timeout as u64
}

pub fn validate_authentication_token(token: u64, timeout: u64) -> bool {
pub fn validate_authentication_token(token: u64, timeout: u8) -> bool {
let cur_token = get_current_authentication_token(timeout);
let dif = if token > cur_token {
token - cur_token
} else {
cur_token - token
};
dif <= timeout
}
dif <= timeout as u64
}
16 changes: 10 additions & 6 deletions clients/src/websocket/farmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use std::collections::HashMap;
use std::io::{Error, ErrorKind};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

pub struct FarmerClient {
pub client: Client,
pub client: Arc<Mutex<Client>>,
handle: JoinHandle<()>,
}
impl FarmerClient {
Expand All @@ -22,7 +23,8 @@ impl FarmerClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client_generated_tls(host, port, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
perform_handshake(&client, network_id, port, NodeType::Farmer).await?;
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Farmer).await?;
Ok(FarmerClient { client, handle })
}
pub async fn new_ssl(
Expand All @@ -35,7 +37,8 @@ impl FarmerClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client_tls(host, port, ssl_info, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
perform_handshake(&client, network_id, port, NodeType::Farmer).await?;
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Farmer).await?;
Ok(FarmerClient { client, handle })
}
pub async fn new(
Expand All @@ -47,15 +50,16 @@ impl FarmerClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client(host, port, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
perform_handshake(&client, network_id, port, NodeType::Farmer).await?;
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Farmer).await?;
Ok(FarmerClient { client, handle })
}

pub async fn join(mut self) -> Result<(), Error> {
pub async fn join(self) -> Result<(), Error> {
self.handle
.await
.map_err(|e| Error::new(ErrorKind::Other, format!("Failed to join farmer: {:?}", e)))?;
self.client.shutdown().await
self.client.lock().await.shutdown().await
}

pub fn is_closed(&self) -> bool {
Expand Down
13 changes: 8 additions & 5 deletions clients/src/websocket/full_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::collections::HashMap;
use std::io::{Error, ErrorKind};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

pub struct FullnodeClient {
pub client: Client,
pub client: Arc<Mutex<Client>>,
handle: JoinHandle<()>,
}
impl FullnodeClient {
Expand All @@ -21,7 +22,8 @@ impl FullnodeClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client(host, port, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
perform_handshake(&client, network_id, port, NodeType::FullNode).await?;
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::FullNode).await?;
Ok(FullnodeClient { client, handle })
}
pub async fn new_ssl(
Expand All @@ -34,17 +36,18 @@ impl FullnodeClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client_tls(host, port, ssl_info, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
perform_handshake(&client, network_id, port, NodeType::FullNode).await?;
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::FullNode).await?;
Ok(FullnodeClient { client, handle })
}

pub async fn join(mut self) -> Result<(), Error> {
pub async fn join(self) -> Result<(), Error> {
self.handle.await.map_err(|e| {
Error::new(
ErrorKind::Other,
format!("Failed to join fullnode: {:?}", e),
)
})?;
self.client.shutdown().await
self.client.lock().await.shutdown().await
}
}
13 changes: 8 additions & 5 deletions clients/src/websocket/harvester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use std::collections::HashMap;
use std::io::{Error, ErrorKind};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

pub struct HarvesterClient {
pub client: Client,
pub client: Arc<Mutex<Client>>,
handle: JoinHandle<()>,
}
impl HarvesterClient {
Expand All @@ -22,7 +23,8 @@ impl HarvesterClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client(host, port, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
perform_handshake(&client, network_id, port, NodeType::Harvester).await?;
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Harvester).await?;
Ok(HarvesterClient { client, handle })
}
pub async fn new_ssl(
Expand All @@ -38,18 +40,19 @@ impl HarvesterClient {
debug!("Spawning Stream Handler for Harvester SSL Connection");
let handle = tokio::spawn(async move { stream.run(run).await });
debug!("Performing Handshake");
perform_handshake(&client, network_id, port, NodeType::Harvester).await?;
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Harvester).await?;
debug!("Harvester Handshake Complete");
Ok(HarvesterClient { client, handle })
}

pub async fn join(mut self) -> Result<(), Error> {
pub async fn join(self) -> Result<(), Error> {
self.handle.await.map_err(|e| {
Error::new(
ErrorKind::Other,
format!("Failed to join harvester: {:?}", e),
)
})?;
self.client.shutdown().await
self.client.lock().await.shutdown().await
}
}
14 changes: 7 additions & 7 deletions clients/src/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ pub struct HandshakeResp {
}

async fn perform_handshake(
client: &Client,
client: Arc<Mutex<Client>>,
network_id: &str,
port: u16,
node_type: NodeType,
Expand Down Expand Up @@ -400,7 +400,7 @@ impl MessageHandler for OneShotHandler {
}

pub async fn oneshot<R: ChiaSerialize, C: Websocket>(
client: &C,
client: Arc<Mutex<C>>,
msg: ChiaMessage,
resp_type: Option<ProtocolMessageTypes>,
msg_id: Option<u16>,
Expand All @@ -420,21 +420,21 @@ pub async fn oneshot<R: ChiaSerialize, C: Websocket>(
},
handle: handle.clone(),
};
client.subscribe(handle.id, chia_handle).await;
client.lock().await.subscribe(handle.id, chia_handle).await;
let res_handle = tokio::spawn(async move {
let res = rx.recv().await;
rx.close();
res
});
client.send(msg.into()).await.map_err(|e| {
client.lock().await.send(msg.into()).await.map_err(|e| {
Error::new(
ErrorKind::InvalidData,
format!("Failed to parse send data: {:?}", e),
)
})?;
select!(
_ = tokio::time::sleep(Duration::from_millis(timeout.unwrap_or(15000))) => {
client.unsubscribe(handle.id).await;
client.lock().await.unsubscribe(handle.id).await;
Err(Error::new(
ErrorKind::Other,
"Timeout before oneshot completed",
Expand All @@ -444,15 +444,15 @@ pub async fn oneshot<R: ChiaSerialize, C: Websocket>(
let res = res?;
if let Some(v) = res {
let mut cursor = Cursor::new(v);
client.unsubscribe(handle.id).await;
client.lock().await.unsubscribe(handle.id).await;
R::from_bytes(&mut cursor).map_err(|e| {
Error::new(
ErrorKind::InvalidData,
format!("Failed to parse msg: {:?}", e),
)
})
} else {
client.unsubscribe(handle.id).await;
client.lock().await.unsubscribe(handle.id).await;
Err(Error::new(
ErrorKind::Other,
"Channel Closed before response received",
Expand Down
9 changes: 6 additions & 3 deletions clients/src/websocket/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::collections::HashMap;
use std::io::Error;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::Mutex;

pub struct WalletClient {
pub client: Client,
pub client: Arc<Mutex<Client>>,
}
impl WalletClient {
pub async fn new(
Expand All @@ -19,7 +20,8 @@ impl WalletClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client(host, port, additional_headers).await?;
tokio::spawn(async move { stream.run(run).await });
let _ = perform_handshake(&client, network_id, port, NodeType::Wallet).await;
let client = Arc::new(Mutex::new(client));
let _ = perform_handshake(client.clone(), network_id, port, NodeType::Wallet).await;
Ok(WalletClient { client })
}
pub async fn new_ssl(
Expand All @@ -32,7 +34,8 @@ impl WalletClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client_tls(host, port, ssl_info, additional_headers).await?;
tokio::spawn(async move { stream.run(run).await });
let _ = perform_handshake(&client, network_id, port, NodeType::Wallet).await;
let client = Arc::new(Mutex::new(client));
let _ = perform_handshake(client.clone(), network_id, port, NodeType::Wallet).await;
Ok(WalletClient { client })
}
}

0 comments on commit 00d9f95

Please sign in to comment.