Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
drogus committed Mar 18, 2024
1 parent 2eae0c8 commit 9a139db
Show file tree
Hide file tree
Showing 17 changed files with 993 additions and 392 deletions.
411 changes: 272 additions & 139 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 14 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,30 @@ members = [
"worker",
"wasm",
"bindings",
"shared",
]

[workspace.dependencies]
tokio = { version = "1.20", features = ["full"] }
anyhow = { version = "1", features = ["backtrace"] }
thiserror = "1"
tokio-serde = { version = "0.9.0", features = ["json"] }
tokio-util = { version = "0.7.3", features = ["full"] }
tokio-serde = { version = "0.9", features = ["json"] }
tokio-util = { version = "0.7", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.108"
futures = "0.3.21"
uuid = { version = "1.4.1", features = ["serde", "v4"] }
serde_json = "1.0"
serde_with = "3.7"
futures = "0.3"
uuid = { version = "1.4", features = ["serde", "v4"] }
wasmtime = { git = "https://github.com/bytecodealliance/wasmtime.git", features = ["async"] }
wasmtime-wasi = { git = "https://github.com/bytecodealliance/wasmtime.git" }
wasi-common = { git = "https://github.com/bytecodealliance/wasmtime.git" }
wiggle = { git = "https://github.com/bytecodealliance/wasmtime.git" }
num-rational = { version = "0.4", features = ["serde"]}
borsh = { version = "1.3", features = ["unstable__schema", "derive"] }
bytes = "1.5"
rational = "1.5"
crows-macros = { path = "macros" }
crows-shared = { path = "shared" }
crows-bindings = { path = "bindings" }
crows-utils = { path = "utils" }
crows-service = { path = "service" }
crows-wasm = { path = "wasm" }
6 changes: 5 additions & 1 deletion bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
borsh.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
crows-macros.workspace = true
crows-shared.workspace = true
41 changes: 29 additions & 12 deletions bindings/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize};
use std::{cell::RefCell, collections::HashMap, mem::MaybeUninit};
use std::{cell::RefCell, collections::HashMap};

#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)]
pub use crows_macros::config;
use serde::{Serialize, Deserialize, de::DeserializeOwned};
use serde_json::{to_vec, from_slice};
pub use crows_shared::Config as ExecutorConfig;
pub use crows_shared::ConstantArrivalRateConfig;

#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub enum HTTPMethod {
HEAD,
GET,
Expand All @@ -11,7 +16,7 @@ pub enum HTTPMethod {
OPTIONS,
}

#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub struct HTTPRequest {
// TODO: these should not be public I think, I'd prefer to do a public interface for them
pub url: String,
Expand All @@ -20,12 +25,12 @@ pub struct HTTPRequest {
pub body: Option<String>,
}

#[derive(Debug, BorshDeserialize, BorshSerialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct HTTPError {
pub message: String,
}

#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub struct HTTPResponse {
// TODO: these should not be public I think, I'd prefer to do a public interface for them
pub headers: HashMap<String, String>,
Expand All @@ -46,14 +51,21 @@ mod bindings {
pub fn log(content: *mut u8, content_len: usize);
pub fn http(content: *mut u8, content_len: usize) -> u64;
pub fn consume_buffer(index: u32, content: *mut u8, content_len: usize);
pub fn set_config(content: *mut u8, content_len: usize) -> u32;
}
}

fn with_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
let mut buffer: Vec<u8> = Vec::with_capacity(256);
// using a buffer saved in thread_local allows us to share it between function calls
thread_local! {
static BUFFER: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(1024));
}

buffer.clear();
f(&mut buffer)
BUFFER.with(|r| {
let mut buf = r.borrow_mut();
buf.clear();
f(&mut buf)
})
}

pub fn http_request(
Expand All @@ -77,9 +89,9 @@ pub fn http_request(

fn call_host_function<T, R, E>(arguments: &T, f: impl FnOnce(&mut Vec<u8>) -> u64) -> Result<R, E>
where
T: BorshSerialize,
R: BorshDeserialize,
E: BorshDeserialize,
T: Serialize,
R: DeserializeOwned,
E: DeserializeOwned,
{
let mut encoded = to_vec(arguments).unwrap();

Expand All @@ -105,3 +117,8 @@ where
Err(from_slice(&buf).expect("Couldn't decode message from the host"))
}
}

pub fn __set_config(config: ExecutorConfig) -> u32 {
let mut encoded = to_vec(&config).unwrap();
unsafe { bindings::set_config(encoded.as_mut_ptr(), encoded.len()) }
}
6 changes: 2 additions & 4 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ enum Commands {
#[arg(short, long)]
name: String,
#[arg(short, long)]
concurrency: usize,
#[arg(short, long)]
workers_number: usize,
},
Workers {
Expand Down Expand Up @@ -68,9 +66,9 @@ pub async fn main() {
.unwrap()
.unwrap();
}
Some(Commands::Start {name,concurrency, workers_number }) => {
Some(Commands::Start {name, workers_number }) => {
coordinator
.start(name.to_string(), concurrency.clone(), workers_number.clone())
.start(name.to_string(), workers_number.clone())
.await
.unwrap();
}
Expand Down
7 changes: 4 additions & 3 deletions coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
crows-utils = { path = "../utils" }
crows-service = { path = "../service" }

tokio.workspace = true
tokio-serde.workspace = true
tokio-util.workspace = true
serde.workspace = true
serde_json.workspace = true
futures.workspace = true
uuid.workspace = true
crows-shared.workspace = true
crows-wasm.workspace = true
crows-utils.workspace = true
crows-service.workspace = true
45 changes: 31 additions & 14 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#![feature(async_fn_in_trait)]
#![feature(return_position_impl_trait_in_trait)]

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use crows_wasm::fetch_config;
use futures::future::join_all;
use tokio::sync::Mutex;
use tokio::time::sleep;
Expand All @@ -22,7 +20,7 @@ use uuid::Uuid;
// TODO: Client should probably be thread safe for easier handling
#[derive(Default)]
struct WorkerToCoordinatorService {
scenarios: Arc<Mutex<HashMap<ModuleId, Vec<u8>>>>,
scenarios: Arc<Mutex<HashMap<String, Vec<u8>>>>,
workers: Arc<Mutex<HashMap<Uuid, WorkerEntry>>>,
client: Arc<Mutex<Option<WorkerClient>>>,
}
Expand All @@ -41,7 +39,7 @@ impl WorkerToCoordinator for WorkerToCoordinatorService {

#[derive(Clone, Default)]
struct CoordinatorService {
scenarios: Arc<Mutex<HashMap<ModuleId, Vec<u8>>>>,
scenarios: Arc<Mutex<HashMap<String, Vec<u8>>>>,
workers: Arc<Mutex<HashMap<Uuid, WorkerEntry>>>,
}

Expand All @@ -58,35 +56,54 @@ impl Coordinator for CoordinatorService {
name: String,
content: Vec<u8>,
) -> Result<(), CoordinatorError> {
let id = ModuleId::new(name.clone(), &content);

// TODO: to send bandwidth maybe it will be worth it to gzip the data? we would be gzipping
// once and sending to N clients
// TODO: to send bandwidth maybe it will be worth it to gzip the data? we would be
// gzipping once and sending to N clients
//
// send each uploaded scenario to all of the workers
for (_, worker_entry) in self.workers.lock().await.iter() {
let locked = worker_entry.client.lock();
let mut futures = Vec::new();
futures.push(async {
if let Some(client) = locked.await.as_mut() {
// TODO: handle Result
client.upload_scenario(id.clone(), content.clone()).await;
client.upload_scenario(name.clone(), content.clone()).await;
}
});

join_all(futures).await;
}
self.scenarios.lock().await.insert(id, content);
self.scenarios.lock().await.insert(name, content);

Ok(())
}

async fn start(&self, name: String, concurrency: usize, workers_number: usize) {
async fn start(&self, name: String, workers_number: usize) -> Result<(), CoordinatorError> {
// TODO: we should check if we have enough workers
// TODO: also this way we will always choose the same workers. in the future we should
// either always split between all workers or do some kind of round robin
// TODO: at the moment we split evenly. in the future we could get some kind of diagnostic
// data from workers in order to determine how much traffic can we push to each worker
// TODO: creating a runtime is probably fast enough, but I'd like to measure and see
// if it's not better to keep one around so we don't create it before each test run
let runtime = crows_wasm::Runtime::new().map_err(|err| CoordinatorError::FailedToCreateRuntime(err.to_string()))?;
let mut instance = {
let scenarios = self.scenarios.lock().await;
let scenario = scenarios.get(&name).ok_or(CoordinatorError::NoSuchModule(name.clone()))?;
let (instance, _) = runtime.compile_instance(&scenario).await.map_err(|_| CoordinatorError::FailedToCompileModule)?;
instance
};
let config = fetch_config(&mut instance).await.map_err(|err| CoordinatorError::CouldNotFetchConfig(err.to_string()))?.split(workers_number);

for (_, worker_entry) in self.workers.lock().await.iter().take(workers_number) {
if let Some(client) = worker_entry.client.lock().await.as_mut() {
client.start(name.clone(), concurrency).await;
// TODO: at the moment we split config to split the load between each of the
// workers, which means that if a worker dies, we will not get a full test
// It would be ideal if we had a way to j
client.start(name.clone(), config.clone()).await;
}
}

Ok(())
}

async fn list_workers(&self) -> Vec<String> {
Expand All @@ -110,7 +127,7 @@ pub async fn main() {
.parse()
.unwrap();

let original_scenarios: Arc<Mutex<HashMap<ModuleId, Vec<u8>>>> = Default::default();
let original_scenarios: Arc<Mutex<HashMap<String, Vec<u8>>>> = Default::default();
let original_workers: Arc<Mutex<HashMap<Uuid, WorkerEntry>>> = Default::default();

let scenarios = original_scenarios.clone();
Expand Down
Loading

0 comments on commit 9a139db

Please sign in to comment.