Skip to content

Commit

Permalink
more std util refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dragazo committed Nov 21, 2023
1 parent 0d0b5bd commit ae64db9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
36 changes: 6 additions & 30 deletions src/std_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use tokio_tungstenite::tungstenite::Message;
use futures::{StreamExt, SinkExt};
use uuid::Uuid;

use crate::real_time::*;
use crate::runtime::*;
use crate::process::*;
use crate::json::*;
Expand All @@ -30,30 +29,7 @@ use crate::*;

const MESSAGE_REPLY_TIMEOUT: Duration = Duration::from_millis(1500);

struct Context {
base_url: String,
client_id: String,
services_url: String,

project_name: String,
project_id: String,
role_name: String,
role_id: String,
}
struct RpcRequest<C: CustomTypes<S>, S: System<C>> {
service: String,
rpc: String,
args: Vec<(String, Json)>,
key: AsyncKey<Result<C::Intermediate, String>>,
}
struct ReplyEntry {
expiry: OffsetDateTime,
value: Option<Json>,
}

type MessageReplies = BTreeMap<ExternReplyKey, ReplyEntry>;

async fn call_rpc_async<C: CustomTypes<S>, S: System<C>>(context: &Context, client: &reqwest::Client, service: &str, rpc: &str, args: &[(&str, &Json)]) -> Result<SimpleValue, String> {
async fn call_rpc_async<C: CustomTypes<S>, S: System<C>>(context: &NetsBloxContext, client: &reqwest::Client, service: &str, rpc: &str, args: &[(&str, &Json)]) -> Result<SimpleValue, String> {
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
let url = format!("{services_url}/{service}/{rpc}?clientId={client_id}&t={time}",
services_url = context.services_url, client_id = context.client_id);
Expand Down Expand Up @@ -96,14 +72,14 @@ async fn call_rpc_async<C: CustomTypes<S>, S: System<C>>(context: &Context, clie
/// as well as overriding default behavior (e.g., rpc intercepting).
pub struct StdSystem<C: CustomTypes<StdSystem<C>>> {
config: Config<C, Self>,
context: Arc<Context>,
context: Arc<NetsBloxContext>,
client: Arc<reqwest::Client>,
rng: Mutex<ChaChaRng>,
clock: Arc<Clock>,

rpc_request_pipe: Sender<RpcRequest<C, Self>>,

message_replies: Arc<Mutex<MessageReplies>>,
message_replies: Arc<Mutex<BTreeMap<ExternReplyKey, ReplyEntry>>>,
message_sender: Sender<OutgoingMessage>,
message_injector: Sender<IncomingMessage>,
message_receiver: Receiver<IncomingMessage>,
Expand All @@ -120,7 +96,7 @@ impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
let configuration = reqwest::get(format!("{base_url}/configuration")).await.unwrap().json::<BTreeMap<String, Json>>().await.unwrap();
let services_hosts = configuration["servicesHosts"].as_array().unwrap();

let mut context = Context {
let mut context = NetsBloxContext {
base_url,
services_url: services_hosts[0].as_object().unwrap().get("url").unwrap().as_str().unwrap().to_owned(),
client_id: format!("_vm-{}", names::Generator::default().next().unwrap()),
Expand All @@ -138,7 +114,7 @@ impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
let (in_sender, in_receiver) = channel();

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn handler(base_url: String, client_id: String, project_name: String, message_replies: Arc<Mutex<MessageReplies>>, out_receiver: Receiver<OutgoingMessage>, in_sender: Sender<IncomingMessage>) {
async fn handler(base_url: String, client_id: String, project_name: String, message_replies: Arc<Mutex<BTreeMap<ExternReplyKey, ReplyEntry>>>, out_receiver: Receiver<OutgoingMessage>, in_sender: Sender<IncomingMessage>) {
let ws_url = format!("{}/network/{client_id}/connect", if let Some(x) = base_url.strip_prefix("http") { format!("ws{x}") } else { format!("wss://{base_url}") });
let (ws, _) = tokio_tungstenite::connect_async(ws_url).await.unwrap();
let (mut ws_sender, ws_receiver) = ws.split();
Expand Down Expand Up @@ -267,7 +243,7 @@ impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
let (sender, receiver) = channel();

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn handler<C: CustomTypes<StdSystem<C>>>(client: Arc<reqwest::Client>, context: Arc<Context>, receiver: Receiver<RpcRequest<C, StdSystem<C>>>) {
async fn handler<C: CustomTypes<StdSystem<C>>>(client: Arc<reqwest::Client>, context: Arc<NetsBloxContext>, receiver: Receiver<RpcRequest<C, StdSystem<C>>>) {
while let Ok(request) = receiver.recv() {
let (client, context) = (client.clone(), context.clone());
tokio::spawn(async move {
Expand Down
24 changes: 24 additions & 0 deletions src/std_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,35 @@
//! Note that this module is only available with the [`std`](crate) feature flag.
use std::sync::{Arc, Mutex};
use std::string::String;
use std::vec::Vec;

use crate::real_time::*;
use crate::runtime::*;
use crate::json::*;
use crate::*;

pub struct NetsBloxContext {
pub base_url: String,
pub client_id: String,
pub services_url: String,

pub project_name: String,
pub project_id: String,
pub role_name: String,
pub role_id: String,
}
pub struct RpcRequest<C: CustomTypes<S>, S: System<C>> {
pub service: String,
pub rpc: String,
pub args: Vec<(String, Json)>,
pub key: AsyncKey<Result<C::Intermediate, String>>,
}
pub struct ReplyEntry {
pub expiry: OffsetDateTime,
pub value: Option<Json>,
}

struct ClockCache {
value: Mutex<OffsetDateTime>,
precision: Precision,
Expand Down

0 comments on commit ae64db9

Please sign in to comment.