From ce14aa8a37e630bae072e053f5c0cfbdee7e1738 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Wed, 20 Mar 2024 16:35:37 +0100 Subject: [PATCH] Allow concurrent requests to RPC services The initial implementation didn't allow requests to services to be handled concurrently. For a proof of concept or a single scenario run it's totally fine, but in the future it may make a lot of sense to deploy Crows as a long running service and leave it running with workers connected and send requests to Crows from a CI run and then multiple runs may be sending multiple requests concurrently. --- coordinator/src/main.rs | 2 +- service/src/lib.rs | 14 +++++--------- utils/src/lib.rs | 39 +++++++++++++++++---------------------- utils/src/services/mod.rs | 4 ++-- worker/src/main.rs | 4 ++-- 5 files changed, 27 insertions(+), 36 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 21e5b1f..9b17a0f 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -31,7 +31,7 @@ struct WorkerEntry { } impl WorkerToCoordinator for WorkerToCoordinatorService { - async fn ping(&mut self, _: WorkerClient) -> String { + async fn ping(&self, _: WorkerClient) -> String { "OK".into() } } diff --git a/service/src/lib.rs b/service/src/lib.rs index 0489e68..3dd972f 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -198,7 +198,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream { impl #server_ident { pub async fn accept(&self, service: T) -> Option<>::Client> - where T: utils::Service<#dummy_ident> + 'static { + where T: utils::Service<#dummy_ident> + Clone + 'static { let (sender, receiver, close_receiver) = self.server.accept().await?; let client = utils::Client::new(sender, receiver, service, Some(close_receiver)); Some(client) @@ -222,7 +222,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream { -> Result<>::Client, std::io::Error> where A: utils::tokio::net::ToSocketAddrs, - T: utils::Service<#dummy_ident> + 'static, + T: utils::Service<#dummy_ident> + Clone + 'static, { let (sender, mut receiver) = utils::create_client(addr).await?; let client = utils::Client::new(sender, receiver, service, None); @@ -234,11 +234,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream { let mut trait_methods = Vec::new(); for method in input.methods { - let receiver = if method.receiver_mutability.is_some() { - quote! { &mut self } - } else { - quote! { &self } - }; + let receiver = quote! { &self }; let pascal = method.ident.to_string().to_case(Case::Pascal); let method_ident = method.ident.clone(); @@ -328,7 +324,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream { type Client = #other_side_client_ident; fn handle_request( - &mut self, + &self, client: Self::Client, message: Self::Request, ) -> std::pin::Pin + Send + '_>> { @@ -383,7 +379,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream { } impl #client_ident { - pub async fn wait(&mut self) { + pub async fn wait(&self) { self.client.wait().await; } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index c61494b..675a21f 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -213,7 +213,7 @@ impl Client { close_receiver: Option>, ) -> >::Client where - T: Service + Send + Sync + 'static, + T: Service + Send + Sync + 'static + Clone, >::Request: Send, >::Response: Send, >::Client: ClientTrait + Clone + Send + Sync + 'static, @@ -239,26 +239,21 @@ impl Client { break; } } else { - // TODO: at the moment we block a service while executing a - // single request. this is because we allow to use &mut self - // in services and thus with the current implementation it - // would be hard to share the service object. It would be better to - // change it to always be &self and control the interior - // mutability with a lock. we could still allow for &mut - // methods, but only &mut methods would block. Another option - // would be to always use &self and thus require the - // implementation to deal with locking for each attribute that - // needs it - let deserialized = serde_json::from_str::<>::Request>(&message.message).unwrap(); - let response = service.handle_request(client_clone.clone(), deserialized).await; - - let message = Message { - id: Uuid::new_v4(), - reply_to: Some(message.id), - message: serde_json::to_string(&response).unwrap(), - message_type: std::any::type_name::().to_string(), - }; - sender.send(message).unwrap(); + let service_clone = service.clone(); + let sender_clone = sender.clone(); + let client_clone = client_clone.clone(); + tokio::spawn(async move { + let deserialized = serde_json::from_str::<>::Request>(&message.message).unwrap(); + let response = service_clone.handle_request(client_clone, deserialized).await; + + let message = Message { + id: Uuid::new_v4(), + reply_to: Some(message.id), + message: serde_json::to_string(&response).unwrap(), + message_type: std::any::type_name::().to_string(), + }; + sender_clone.send(message).unwrap(); + }); } }, None => break, @@ -377,7 +372,7 @@ pub trait Service: Send + Sync { type Client: ClientTrait + Clone + Send + Sync; fn handle_request( - &mut self, + &self, client: Self::Client, message: Self::Request, ) -> Pin + Send + '_>>; diff --git a/utils/src/services/mod.rs b/utils/src/services/mod.rs index 0c140c8..8118929 100644 --- a/utils/src/services/mod.rs +++ b/utils/src/services/mod.rs @@ -50,7 +50,7 @@ impl Into for RunId { // all the trait definitions wouldn't have to be here #[service(variant = "server", other_side = Worker)] pub trait WorkerToCoordinator { - async fn ping(&mut self) -> String; + async fn ping(&self) -> String; } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -75,7 +75,7 @@ pub struct WorkerData { #[service(variant = "client", other_side = WorkerToCoordinator)] pub trait Worker { - async fn upload_scenario(&mut self, name: String, content: Vec); + async fn upload_scenario(&self, name: String, content: Vec); async fn ping(&self) -> String; async fn start(&self, name: String, config: crows_shared::Config) -> Result<(), WorkerError>; async fn get_data(&self) -> WorkerData; diff --git a/worker/src/main.rs b/worker/src/main.rs index 4df5ea4..9d92fc7 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -47,7 +47,7 @@ struct WorkerService { } impl Worker for WorkerService { - async fn upload_scenario(&mut self, _: WorkerToCoordinatorClient, name: String, content: Vec) { + async fn upload_scenario(&self, _: WorkerToCoordinatorClient, name: String, content: Vec) { self.scenarios.write().await.insert(name, content); } @@ -103,7 +103,7 @@ async fn main() -> Result<(), Box> { }; println!("Connecting to {coordinator_address}"); - let mut client = connect_to_worker_to_coordinator(coordinator_address, service) + let client = connect_to_worker_to_coordinator(coordinator_address, service) .await .unwrap();