Skip to content

Commit

Permalink
fix netsblox conn race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
dragazo committed Nov 22, 2023
1 parent ef8381f commit 22f9e76
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions src/std_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn call_rpc_async<C: CustomTypes<S>, S: System<C>>(context: &NetsBloxConte
pub struct StdSystem<C: CustomTypes<StdSystem<C>>> {
config: Config<C, Self>,
context: Arc<NetsBloxContext>,
client: Arc<reqwest::Client>,
client: reqwest::Client,
rng: Mutex<ChaChaRng>,
clock: Arc<Clock>,

Expand All @@ -93,8 +93,9 @@ impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
}
/// Initializes a new instance of [`StdSystem`] targeting the given NetsBlox server base url, e.g., `https://cloud.netsblox.org`.
pub async fn new_async(base_url: String, project_name: Option<&str>, config: Config<C, Self>, clock: Arc<Clock>) -> Self {
let client = reqwest::Client::builder().build().unwrap();
let services_url = {
let configuration = reqwest::get(format!("{base_url}/configuration")).await.unwrap().json::<BTreeMap<String, Json>>().await.unwrap();
let configuration = client.get(format!("{base_url}/configuration")).send().await.unwrap().json::<BTreeMap<String, Json>>().await.unwrap();
let services_hosts = configuration["servicesHosts"].as_array().unwrap();
services_hosts[0].as_object().unwrap().get("url").unwrap().as_str().unwrap().to_owned()
};
Expand All @@ -111,13 +112,14 @@ impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
};

let message_replies = Arc::new(Mutex::new(Default::default()));
let (message_sender, message_receiver, message_injector) = {
let (message_sender, message_receiver, message_injector, ws_finish_flag) = {
let (base_url, client_id, project_name, message_replies) = (context.base_url.clone(), context.client_id.clone(), context.project_name.clone(), message_replies.clone());
let (out_sender, out_receiver) = channel();
let (in_sender, in_receiver) = channel();
let finish_flag = Arc::new(());

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn handler(base_url: String, client_id: String, project_name: String, message_replies: Arc<Mutex<BTreeMap<ExternReplyKey, ReplyEntry>>>, out_receiver: Receiver<OutgoingMessage>, in_sender: Sender<IncomingMessage>) {
async fn handler(base_url: String, client_id: String, project_name: String, message_replies: Arc<Mutex<BTreeMap<ExternReplyKey, ReplyEntry>>>, out_receiver: Receiver<OutgoingMessage>, in_sender: Sender<IncomingMessage>, finish_flag: Arc<()>) {
let ws_url = format!("{}/network/{client_id}/connect", if let Some(x) = base_url.strip_prefix("http") { format!("ws{x}") } else { format!("wss://{base_url}") });
let (ws, _) = tokio_tungstenite::connect_async(ws_url).await.unwrap();
let (mut ws_sender, ws_receiver) = ws.split();
Expand Down Expand Up @@ -179,6 +181,7 @@ impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
});

ws_sender_sender.send(Message::Text(json!({ "type": "set-uuid", "clientId": client_id }).to_string())).await.unwrap();
drop(finish_flag);

let src_id = format!("{project_name}@{client_id}#vm");
fn resolve_targets<'a>(targets: &'a mut [String], src_id: &String) -> &'a mut [String] {
Expand Down Expand Up @@ -218,12 +221,16 @@ impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
}
}
let in_sender_clone = in_sender.clone();
thread::spawn(move || handler(base_url, client_id, project_name, message_replies, out_receiver, in_sender_clone));
let finish_flag_clone = finish_flag.clone();
thread::spawn(move || handler(base_url, client_id, project_name, message_replies, out_receiver, in_sender_clone, finish_flag_clone));

(out_sender, in_receiver, in_sender)
(out_sender, in_receiver, in_sender, Arc::downgrade(&finish_flag))
};

let client = Arc::new(reqwest::Client::builder().build().unwrap());
while ws_finish_flag.upgrade().is_some() {
tokio::time::sleep(Duration::from_millis(10)).await;
}

let meta = client.post(format!("{}/projects/", context.base_url))
.json(&json!({ "clientId": context.client_id, "name": context.project_name }))
.send().await.unwrap()
Expand All @@ -246,7 +253,7 @@ impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
let (sender, receiver) = channel();

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn handler<C: CustomTypes<StdSystem<C>>>(client: Arc<reqwest::Client>, context: Arc<NetsBloxContext>, receiver: Receiver<RpcRequest<C, StdSystem<C>>>) {
async fn handler<C: CustomTypes<StdSystem<C>>>(client: reqwest::Client, context: Arc<NetsBloxContext>, receiver: Receiver<RpcRequest<C, StdSystem<C>>>) {
while let Ok(request) = receiver.recv() {
let (client, context) = (client.clone(), context.clone());
tokio::spawn(async move {
Expand Down

0 comments on commit 22f9e76

Please sign in to comment.