Skip to content

Commit

Permalink
Added connection retry for cases where too many clients attempt to co…
Browse files Browse the repository at this point in the history
…nnect at once

Improved error message for 'spraycc run'
Bumped version to 0.8
  • Loading branch information
jasonmccampbell committed Feb 4, 2021
1 parent c52b71a commit 0529493
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 22 deletions.
62 changes: 60 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "spraycc"
version = "0.1.0"
version = "0.8.0"
authors = ["Jason McCampbell <jasonmccampbell@gmail.com>"]
edition = "2018"

Expand All @@ -13,6 +13,7 @@ get_if_addrs = "0.5"
gethostname = "0"
home = "0.5"
lazy_static = "1"
rand = "^0.8"
serde = { version = "1", features = ["derive"] }
simple-process-stats = "^1"
tempdir = "0.3"
Expand Down
34 changes: 27 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
/// results are available.
///
extern crate lazy_static;
extern crate rand;
extern crate tokio;
extern crate which;

use lazy_static::lazy_static;
use std::error::Error;
use std::ffi::OsString;
use std::{error::Error, time::Duration};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::process;
Expand Down Expand Up @@ -76,7 +77,7 @@ pub async fn run(args: Vec<String>) -> Result<i32, Box<dyn Error + Send + Sync>>
if run_local {
status = run_local_task(task).await?;
} else if let Some(callme) = config::read_server_contact_info() {
match TcpStream::connect(callme.addr).await {
match connect_w_retry(&callme).await {
Ok(stream) => {
let mut conn = ipc::Connection::new(stream);

Expand Down Expand Up @@ -105,11 +106,7 @@ pub async fn run(args: Vec<String>) -> Result<i32, Box<dyn Error + Send + Sync>>
output_files.pop().unwrap().shutdown().await?
}
}
Err(err) => {
println!(
"Unable to connect to server at {}, please make sure it was started with 'spraycc server': {}",
callme.addr, err
);
Err(_) => {
status = Some(-1);
}
}
Expand All @@ -122,6 +119,29 @@ pub async fn run(args: Vec<String>) -> Result<i32, Box<dyn Error + Send + Sync>>
Ok(status.expect("No task status as set"))
}

async fn connect_w_retry(callme: &ipc::CallMe) -> Result<TcpStream, Box<dyn Error + Send + Send>> {
let mut count = 0;
loop {
match TcpStream::connect(callme.addr).await {
Ok(stream) => return Ok(stream),
Err(err) => {
if count < 5 {
count += 1;
println!("Retrying connection to {} ({}): {}", callme.addr, count, &err);
let duration = Duration::from_millis(rand::random::<u64>() % 2048 + 512); // 2-2.5s backoff period
tokio::time::sleep(duration).await;
} else {
println!(
"Unable to connect to server at {}, please make sure it was started with 'spraycc server': {}",
callme.addr, &err
);
return Err(Box::new(err));
}
}
}
}
}

/// Handles messages coming from the client via the server. These will be stderr, stdout and generated files from
/// the execution of the task, and then finally the task status.
///
Expand Down
28 changes: 16 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub mod task;
#[tokio::main]
async fn main() {
let matches = App::new("SprayCC")
.version("0.1.0")
.version("0.8.0")
.about("SprayCC - distributed compiler wrapper")
.setting(AppSettings::SubcommandRequiredElseHelp)
.subcommand(
Expand Down Expand Up @@ -114,18 +114,22 @@ async fn main() {
panic!("Error: access code must be a numeric value, got: {}", code);
}
} else if let Some(run) = matches.subcommand_matches("run") {
let args: Vec<String> = run.values_of("compiler_options").unwrap().map(String::from).collect();
match client::run(args).await {
Ok(ec) if ec > 0 => {
// Remote task failed so exit with the same exit code
std::process::exit(ec);
if let Some(compiler_opts) = run.values_of("compiler_options") {
let args: Vec<String> = compiler_opts.map(String::from).collect();
match client::run(args).await {
Ok(ec) if ec > 0 => {
// Remote task failed so exit with the same exit code
std::process::exit(ec);
}
Ok(ec) if ec < 0 => {
// Remote task failed with signal
std::process::exit(-1);
}
Ok(_) => Ok(()),
Err(e) => Err(e),
}
Ok(ec) if ec < 0 => {
// Remote task failed with signal
std::process::exit(-1);
}
Ok(_) => Ok(()),
Err(e) => Err(e),
} else {
panic!("'spraycc run' must be followed by the command line to be run");
}
} else if let Some(fakecc) = matches.subcommand_matches("fakecc") {
for output in fakecc.values_of("output_file").unwrap() {
Expand Down

0 comments on commit 0529493

Please sign in to comment.