Skip to content

Commit

Permalink
Pre shutdown hooks into GatewayClient
Browse files Browse the repository at this point in the history
  • Loading branch information
durch committed Jan 22, 2025
1 parent 8670693 commit 8a455d7
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 27 deletions.
32 changes: 13 additions & 19 deletions common/client-core/src/client/mix_traffic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl MixTrafficController {
)
}

pub fn forget_me(&self) -> &ForgetMe {
&self.forget_me
}

async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
debug_assert!(!mix_packets.is_empty());

Expand Down Expand Up @@ -98,6 +102,15 @@ impl MixTrafficController {
}

pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
if self.forget_me.any() {
log::debug!("Setting pre-shutdown forget me request");
self.gateway_transceiver
.set_pre_shutdown_client_request(ClientRequest::ForgetMe {
client: self.forget_me().client(),
stats: self.forget_me().stats(),
});
}

spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");

Expand All @@ -120,25 +133,6 @@ impl MixTrafficController {
}
shutdown.recv_timeout().await;

if self.forget_me.any() {
log::info!("Sending forget me request to the gateway");
match self
.gateway_transceiver
.send_client_request(ClientRequest::ForgetMe {
client: self.forget_me.client(),
stats: self.forget_me.stats(),
})
.await
{
Ok(_) => {
log::info!("Successfully sent forget me request to the gateway");
}
Err(err) => {
log::error!("Failed to send forget me request to the gateway: {err}");
}
}
}

log::debug!("MixTrafficController: Exiting");
});
}
Expand Down
26 changes: 18 additions & 8 deletions common/client-core/src/client/mix_traffic/transceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError>;
fn set_pre_shutdown_client_request(&mut self, request: ClientRequest);
}

/// This trait defines the functionality of sending `MixPacket` into the mixnet,
Expand Down Expand Up @@ -88,6 +89,10 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
) -> Result<(), GatewayClientError> {
(**self).send_client_request(message).await
}

fn set_pre_shutdown_client_request(&mut self, request: ClientRequest) {
(**self).set_pre_shutdown_client_request(request);
}
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
Expand Down Expand Up @@ -143,14 +148,11 @@ where
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.gateway_client.shared_key() {
self.gateway_client
.send_websocket_message(message.encrypt(&*shared_key)?)
.await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
self.gateway_client.send_client_request(message).await
}

fn set_pre_shutdown_client_request(&mut self, request: ClientRequest) {
self.gateway_client.set_pre_shutdown_client_request(request);
}
}

Expand Down Expand Up @@ -239,6 +241,10 @@ mod nonwasm_sealed {
) -> Result<(), GatewayClientError> {
Ok(())
}

fn set_pre_shutdown_client_request(&mut self, _request: ClientRequest) {
// no-op
}
}

#[async_trait]
Expand Down Expand Up @@ -321,4 +327,8 @@ impl GatewayTransceiver for MockGateway {
) -> Result<(), GatewayClientError> {
Ok(())
}

fn set_pre_shutdown_client_request(&mut self, _request: ClientRequest) {
// no-op
}
}
31 changes: 31 additions & 0 deletions common/client-libs/gateway-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {

/// Listen to shutdown messages and send notifications back to the task manager
task_client: TaskClient,
pre_shutdown_client_request: Option<ClientRequest>,
}

impl<C, St> GatewayClient<C, St> {
Expand Down Expand Up @@ -139,9 +140,22 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(unix)]
connection_fd_callback,
task_client,
pre_shutdown_client_request: None,
}
}

pub fn set_pre_shutdown_client_request(&mut self, request: ClientRequest) {
self.pre_shutdown_client_request = Some(request);
}

async fn send_pre_shutdown_client_request(&mut self) -> Result<(), GatewayClientError> {
log::debug!("GatewayClient: Sending pre-shutdown request");
if let Some(request) = self.pre_shutdown_client_request.take() {
self.send_client_request(request).await?;
}
Ok(())
}

pub fn gateway_identity(&self) -> identity::PublicKey {
self.gateway_identity
}
Expand Down Expand Up @@ -271,6 +285,19 @@ impl<C, St> GatewayClient<C, St> {
}
}

pub async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.shared_key() {
let encrypted = message.encrypt(&*shared_key)?;
Box::pin(self.send_websocket_message(encrypted)).await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
}

async fn read_control_response(&mut self) -> Result<ServerResponse, GatewayClientError> {
// we use the fact that all request responses are Message::Text and only pushed
// sphinx packets are Message::Binary
Expand All @@ -288,9 +315,11 @@ impl<C, St> GatewayClient<C, St> {
_ = self.task_client.recv() => {
log::trace!("GatewayClient control response: Received shutdown");
log::debug!("GatewayClient control response: Exiting");
self.send_pre_shutdown_client_request().await?;
break Err(GatewayClientError::ConnectionClosedGatewayShutdown);
}
_ = &mut timeout => {
self.send_pre_shutdown_client_request().await?;
break Err(GatewayClientError::Timeout);
}
msg = conn.next() => {
Expand Down Expand Up @@ -1050,6 +1079,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
#[cfg(unix)]
connection_fd_callback: None,
task_client,
pre_shutdown_client_request: None,
}
}

Expand Down Expand Up @@ -1082,6 +1112,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
task_client,
pre_shutdown_client_request: self.pre_shutdown_client_request,
}
}
}

0 comments on commit 8a455d7

Please sign in to comment.