Skip to content

Commit

Permalink
Concurrency Updates and version bump to 1.0.8
Browse files Browse the repository at this point in the history
  • Loading branch information
luna committed Sep 23, 2023
1 parent 88837bc commit a0cc008
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 68 deletions.
4 changes: 2 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dg_xch_cli"
version = "1.0.7"
version = "1.0.8"
edition = "2021"
authors = ["James Hoerr"]
description = "CLI Utilities for the Chia Blockchain"
Expand All @@ -12,7 +12,7 @@ repository = "https://github.com/GalactechsLLC/dg_xch_utils/cli"
bip39 = {version= "2.0.0", features=["rand"] }
blst = "0.3.7"
clap = { version = "4.1.1", features = ["derive"] }
dg_xch_clients = {path = "../clients", version="1.0.7"}
dg_xch_clients = {path = "../clients", version="1.0.8"}
dg_xch_core = {path = "../core", version = "1.0.7", features = ["paperclip"] }
dg_xch_keys = {path = "../keys", version="1.0.7"}
dg_xch_pos = {path = "../proof_of_space", version="1.0.7"}
Expand Down
5 changes: 3 additions & 2 deletions clients/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dg_xch_clients"
version = "1.0.7"
version = "1.0.8"
edition = "2021"
authors = ["James Hoerr"]
description = "RPC and Websocket Clients the Chia Blockchain"
Expand All @@ -10,14 +10,15 @@ repository = "https://github.com/GalactechsLLC/dg_xch_utils/clients"

[dependencies]
async-trait = "0.1.58"
dashmap = "5.5.3"
dg_xch_core = {path = "../core", version = "1.0.7", features = ["paperclip"] }
dg_xch_macros = {path = "../macros", version="1.0.7"}
dg_xch_pos = {path = "../proof_of_space", version="1.0.7"}
dg_xch_serialize = {path = "../serialize", version="1.0.7"}
futures-util = "0.3.25"
hyper = {version="0.14.23", features=["full"]}
log = "0.4.17"
reqwest = {version="0.11.11", features=["json", "rustls-tls-webpki-roots"]}
reqwest = {version="0.11.20", features=["json", "rustls-tls-webpki-roots"]}
rustls = {version = "0.21.5", features = ["dangerous_configuration"] }
rustls-pemfile = "1.0.1"
serde = { version = "1.0.149", features = ["derive"] }
Expand Down
16 changes: 6 additions & 10 deletions clients/src/websocket/farmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use std::collections::HashMap;
use std::io::{Error, ErrorKind};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

pub struct FarmerClient {
pub client: Arc<Mutex<Client>>,
pub client: Client,
handle: JoinHandle<()>,
}
impl FarmerClient {
Expand All @@ -23,8 +22,7 @@ impl FarmerClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client_generated_tls(host, port, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Farmer).await?;
perform_handshake(&client, network_id, port, NodeType::Farmer).await?;
Ok(FarmerClient { client, handle })
}
pub async fn new_ssl(
Expand All @@ -37,8 +35,7 @@ impl FarmerClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client_tls(host, port, ssl_info, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Farmer).await?;
perform_handshake(&client, network_id, port, NodeType::Farmer).await?;
Ok(FarmerClient { client, handle })
}
pub async fn new(
Expand All @@ -50,16 +47,15 @@ impl FarmerClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client(host, port, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Farmer).await?;
perform_handshake(&client, network_id, port, NodeType::Farmer).await?;
Ok(FarmerClient { client, handle })
}

pub async fn join(self) -> Result<(), Error> {
pub async fn join(mut self) -> Result<(), Error> {
self.handle
.await
.map_err(|e| Error::new(ErrorKind::Other, format!("Failed to join farmer: {:?}", e)))?;
self.client.lock().await.shutdown().await
self.client.shutdown().await
}

pub fn is_closed(&self) -> bool {
Expand Down
13 changes: 5 additions & 8 deletions clients/src/websocket/full_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use std::collections::HashMap;
use std::io::{Error, ErrorKind};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

pub struct FullnodeClient {
pub client: Arc<Mutex<Client>>,
pub client: Client,
handle: JoinHandle<()>,
}
impl FullnodeClient {
Expand All @@ -22,8 +21,7 @@ impl FullnodeClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client(host, port, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::FullNode).await?;
perform_handshake(&client, network_id, port, NodeType::FullNode).await?;
Ok(FullnodeClient { client, handle })
}
pub async fn new_ssl(
Expand All @@ -36,18 +34,17 @@ impl FullnodeClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client_tls(host, port, ssl_info, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::FullNode).await?;
perform_handshake(&client, network_id, port, NodeType::FullNode).await?;
Ok(FullnodeClient { client, handle })
}

pub async fn join(self) -> Result<(), Error> {
pub async fn join(mut self) -> Result<(), Error> {
self.handle.await.map_err(|e| {
Error::new(
ErrorKind::Other,
format!("Failed to join fullnode: {:?}", e),
)
})?;
self.client.lock().await.shutdown().await
self.client.shutdown().await
}
}
13 changes: 5 additions & 8 deletions clients/src/websocket/harvester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use std::collections::HashMap;
use std::io::{Error, ErrorKind};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

pub struct HarvesterClient {
pub client: Arc<Mutex<Client>>,
pub client: Client,
handle: JoinHandle<()>,
}
impl HarvesterClient {
Expand All @@ -23,8 +22,7 @@ impl HarvesterClient {
) -> Result<Self, Error> {
let (client, mut stream) = get_client(host, port, additional_headers).await?;
let handle = tokio::spawn(async move { stream.run(run).await });
let client = Arc::new(Mutex::new(client));
perform_handshake(client.clone(), network_id, port, NodeType::Harvester).await?;
perform_handshake(&client, network_id, port, NodeType::Harvester).await?;
Ok(HarvesterClient { client, handle })
}
pub async fn new_ssl(
Expand All @@ -39,20 +37,19 @@ impl HarvesterClient {
let (client, mut stream) = get_client_tls(host, port, ssl_info, additional_headers).await?;
debug!("Spawning Stream Handler for Harvester SSL Connection");
let handle = tokio::spawn(async move { stream.run(run).await });
let client = Arc::new(Mutex::new(client));
debug!("Performing Handshake");
perform_handshake(client.clone(), network_id, port, NodeType::Harvester).await?;
perform_handshake(&client, network_id, port, NodeType::Harvester).await?;
debug!("Harvester Handshake Complete");
Ok(HarvesterClient { client, handle })
}

pub async fn join(self) -> Result<(), Error> {
pub async fn join(mut self) -> Result<(), Error> {
self.handle.await.map_err(|e| {
Error::new(
ErrorKind::Other,
format!("Failed to join harvester: {:?}", e),
)
})?;
self.client.lock().await.shutdown().await
self.client.shutdown().await
}
}
Loading

0 comments on commit a0cc008

Please sign in to comment.