Skip to content

Commit

Permalink
Allow concurrent requests to RPC services
Browse files Browse the repository at this point in the history
The initial implementation didn't allow requests to services to be
handled concurrently. For a proof of concept or a single scenario run
it's totally fine, but in the future it may make a lot of sense to
deploy Crows as a long running service and leave it running with workers
connected and send requests to Crows from a CI run and then multiple
runs may be sending multiple requests concurrently.
  • Loading branch information
drogus committed Mar 20, 2024
1 parent 0f76de1 commit ce14aa8
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 36 deletions.
2 changes: 1 addition & 1 deletion coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct WorkerEntry {
}

impl WorkerToCoordinator for WorkerToCoordinatorService {
async fn ping(&mut self, _: WorkerClient) -> String {
async fn ping(&self, _: WorkerClient) -> String {
"OK".into()
}
}
Expand Down
14 changes: 5 additions & 9 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream {

impl #server_ident {
pub async fn accept<T>(&self, service: T) -> Option<<T as utils::Service<#dummy_ident>>::Client>
where T: utils::Service<#dummy_ident> + 'static {
where T: utils::Service<#dummy_ident> + Clone + 'static {
let (sender, receiver, close_receiver) = self.server.accept().await?;
let client = utils::Client::new(sender, receiver, service, Some(close_receiver));
Some(client)
Expand All @@ -222,7 +222,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream {
-> Result<<T as utils::Service<#dummy_ident>>::Client, std::io::Error>
where
A: utils::tokio::net::ToSocketAddrs,
T: utils::Service<#dummy_ident> + 'static,
T: utils::Service<#dummy_ident> + Clone + 'static,
{
let (sender, mut receiver) = utils::create_client(addr).await?;
let client = utils::Client::new(sender, receiver, service, None);
Expand All @@ -234,11 +234,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream {
let mut trait_methods = Vec::new();

for method in input.methods {
let receiver = if method.receiver_mutability.is_some() {
quote! { &mut self }
} else {
quote! { &self }
};
let receiver = quote! { &self };

let pascal = method.ident.to_string().to_case(Case::Pascal);
let method_ident = method.ident.clone();
Expand Down Expand Up @@ -328,7 +324,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream {
type Client = #other_side_client_ident;

fn handle_request(
&mut self,
&self,
client: Self::Client,
message: Self::Request,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Self::Response> + Send + '_>> {
Expand Down Expand Up @@ -383,7 +379,7 @@ pub fn service(attr: TokenStream, original_input: TokenStream) -> TokenStream {
}

impl #client_ident {
pub async fn wait(&mut self) {
pub async fn wait(&self) {
self.client.wait().await;
}

Expand Down
39 changes: 17 additions & 22 deletions utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl Client {
close_receiver: Option<oneshot::Receiver<()>>,
) -> <T as Service<DummyType>>::Client
where
T: Service<DummyType> + Send + Sync + 'static,
T: Service<DummyType> + Send + Sync + 'static + Clone,
<T as Service<DummyType>>::Request: Send,
<T as Service<DummyType>>::Response: Send,
<T as Service<DummyType>>::Client: ClientTrait + Clone + Send + Sync + 'static,
Expand All @@ -239,26 +239,21 @@ impl Client {
break;
}
} else {
// TODO: at the moment we block a service while executing a
// single request. this is because we allow to use &mut self
// in services and thus with the current implementation it
// would be hard to share the service object. It would be better to
// change it to always be &self and control the interior
// mutability with a lock. we could still allow for &mut
// methods, but only &mut methods would block. Another option
// would be to always use &self and thus require the
// implementation to deal with locking for each attribute that
// needs it
let deserialized = serde_json::from_str::<<T as Service<DummyType>>::Request>(&message.message).unwrap();
let response = service.handle_request(client_clone.clone(), deserialized).await;

let message = Message {
id: Uuid::new_v4(),
reply_to: Some(message.id),
message: serde_json::to_string(&response).unwrap(),
message_type: std::any::type_name::<T>().to_string(),
};
sender.send(message).unwrap();
let service_clone = service.clone();
let sender_clone = sender.clone();
let client_clone = client_clone.clone();
tokio::spawn(async move {
let deserialized = serde_json::from_str::<<T as Service<DummyType>>::Request>(&message.message).unwrap();
let response = service_clone.handle_request(client_clone, deserialized).await;

let message = Message {
id: Uuid::new_v4(),
reply_to: Some(message.id),
message: serde_json::to_string(&response).unwrap(),
message_type: std::any::type_name::<T>().to_string(),
};
sender_clone.send(message).unwrap();
});
}
},
None => break,
Expand Down Expand Up @@ -377,7 +372,7 @@ pub trait Service<DummyType>: Send + Sync {
type Client: ClientTrait + Clone + Send + Sync;

fn handle_request(
&mut self,
&self,
client: Self::Client,
message: Self::Request,
) -> Pin<Box<dyn Future<Output = Self::Response> + Send + '_>>;
Expand Down
4 changes: 2 additions & 2 deletions utils/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Into<Uuid> for RunId {
// all the trait definitions wouldn't have to be here
#[service(variant = "server", other_side = Worker)]
pub trait WorkerToCoordinator {
async fn ping(&mut self) -> String;
async fn ping(&self) -> String;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand All @@ -75,7 +75,7 @@ pub struct WorkerData {

#[service(variant = "client", other_side = WorkerToCoordinator)]
pub trait Worker {
async fn upload_scenario(&mut self, name: String, content: Vec<u8>);
async fn upload_scenario(&self, name: String, content: Vec<u8>);
async fn ping(&self) -> String;
async fn start(&self, name: String, config: crows_shared::Config) -> Result<(), WorkerError>;
async fn get_data(&self) -> WorkerData;
Expand Down
4 changes: 2 additions & 2 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct WorkerService {
}

impl Worker for WorkerService {
async fn upload_scenario(&mut self, _: WorkerToCoordinatorClient, name: String, content: Vec<u8>) {
async fn upload_scenario(&self, _: WorkerToCoordinatorClient, name: String, content: Vec<u8>) {
self.scenarios.write().await.insert(name, content);
}

Expand Down Expand Up @@ -103,7 +103,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};

println!("Connecting to {coordinator_address}");
let mut client = connect_to_worker_to_coordinator(coordinator_address, service)
let client = connect_to_worker_to_coordinator(coordinator_address, service)
.await
.unwrap();

Expand Down

0 comments on commit ce14aa8

Please sign in to comment.