Skip to content

Commit

Permalink
Only keep close_receiver behind a lock
Browse files Browse the repository at this point in the history
When creating ClientInner I moved all of the attributes from Client
there, but senders can be cloned without a an Arc<RwLock<...>>
  • Loading branch information
drogus committed Mar 20, 2024
1 parent 7257e03 commit 727081e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 22 deletions.
1 change: 1 addition & 0 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl Coordinator for CoordinatorService {
let scenarios = self.scenarios.lock().await;
let scenario = scenarios.get(&name).ok_or(CoordinatorError::NoSuchModule(name.clone()))?.to_owned();
drop(scenarios);

let runtime = crows_wasm::Runtime::new(&scenario).map_err(|err| CoordinatorError::FailedToCreateRuntime(err.to_string()))?;
let (instance, _, mut store) = Instance::new(&runtime.environment, &runtime.module).await.map_err(|_| CoordinatorError::FailedToCompileModule)?;
let config = fetch_config(instance, &mut store).await.map_err(|err| CoordinatorError::CouldNotFetchConfig(err.to_string()))?.split(workers_number);
Expand Down
33 changes: 11 additions & 22 deletions utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ enum InternalMessage {
#[derive(Clone)]
pub struct Client {
inner: Arc<RwLock<ClientInner>>,
sender: UnboundedSender<Message>,
internal_sender: UnboundedSender<InternalMessage>,
}

struct ClientInner {
sender: UnboundedSender<Message>,
internal_sender: UnboundedSender<InternalMessage>,
close_receiver: Option<oneshot::Receiver<()>>,
}

Expand All @@ -187,7 +187,8 @@ impl Client {
respond_to: tx,
message_id: message.id,
};
self.send_internal(InternalMessage::RegisterListener(register_listener)).await?;
self.send_internal(InternalMessage::RegisterListener(register_listener))
.await?;
self.send(message).await?;

// TODO: rewrite to map
Expand All @@ -197,22 +198,12 @@ impl Client {
}
}

async fn send(
&self,
message: Message,
) -> anyhow::Result<()> {
let inner = self.inner.read().await;

Ok(inner.sender.send(message)?)
async fn send(&self, message: Message) -> anyhow::Result<()> {
Ok(self.sender.send(message)?)
}

async fn send_internal(
&self,
message: InternalMessage,
) -> anyhow::Result<()> {
let inner = self.inner.read().await;

Ok(inner.internal_sender.send(message)?)
async fn send_internal(&self, message: InternalMessage) -> anyhow::Result<()> {
Ok(self.internal_sender.send(message)?)
}

pub fn new<T, DummyType>(
Expand All @@ -229,11 +220,9 @@ impl Client {
{
let (internal_sender, mut internal_receiver) = unbounded_channel();
let client = T::Client::new(Self {
inner: Arc::new(RwLock::new(ClientInner {
sender: sender.clone(),
internal_sender,
close_receiver,
})),
inner: Arc::new(RwLock::new(ClientInner { close_receiver })),
sender: sender.clone(),
internal_sender,
});

let client_clone = client.clone();
Expand Down

0 comments on commit 727081e

Please sign in to comment.