Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LightClient: Unsubscribe from subscriptions #1408

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 82 additions & 29 deletions lightclient/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum FromSubxt {
Subscription {
/// The method of the request.
method: String,
/// The method to unsubscribe.
unsubscribe_method: String,
/// The parameters of the request.
params: String,
/// Channel used to send back the subscription ID if successful.
Expand All @@ -69,18 +71,40 @@ pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
/// The RPC method request is made in the background and the response should
/// not be sent back to the user.
/// Map the request ID of a RPC method to the frontend `Sender`.
id_to_subscription: HashMap<
(usize, smoldot_light::ChainId),
(
oneshot::Sender<MethodResponse>,
mpsc::UnboundedSender<Box<RawValue>>,
),
>,
id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>,
/// Map the subscription ID to the frontend `Sender`.
///
/// The subscription ID is entirely generated by the node (smoldot). Therefore, it is
/// possible for two distinct subscriptions of different chains to have the same subscription ID.
subscriptions: HashMap<(usize, smoldot_light::ChainId), mpsc::UnboundedSender<Box<RawValue>>>,
subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>,
}

/// The state needed to resolve the subscription ID and send
/// back the response to frontend.
struct PendingSubscription {
/// Send the method response ID back to the user.
///
/// It contains the subscription ID if successful, or an JSON RPC error object.
sub_id_sender: oneshot::Sender<MethodResponse>,
/// The subscription state that is added to the `subscriptions` map only
/// if the subscription ID is successfully sent back to the user.
subscription_state: ActiveSubscription,
}

impl PendingSubscription {
/// Transforms the pending subscription into an active subscription.
fn into_parts(self) -> (oneshot::Sender<MethodResponse>, ActiveSubscription) {
(self.sub_id_sender, self.subscription_state)
}
}

/// The state of the subscription.
struct ActiveSubscription {
/// Channel to send the subscription notifications back to frontend.
sender: mpsc::UnboundedSender<Box<RawValue>>,
/// The unsubscribe method to call when the user drops the receiver
/// part of the channel.
unsubscribe_method: String,
}

impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
Expand Down Expand Up @@ -152,6 +176,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
}
FromSubxt::Subscription {
method,
unsubscribe_method,
params,
sub_id,
sender,
Expand All @@ -166,8 +191,15 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
);

tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}");
let subscription_id_state = PendingSubscription {
sub_id_sender: sub_id,
subscription_state: ActiveSubscription {
sender,
unsubscribe_method,
},
};
self.id_to_subscription
.insert((id, chain_id), (sub_id, sender));
.insert((id, chain_id), subscription_id_state);

let result = self.client.json_rpc_request(request, chain_id);
if let Err(err) = result {
Expand All @@ -176,13 +208,14 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
"Cannot send RPC request to lightclient {:?}",
err.to_string()
);
let (sub_id, _) = self
let subscription_id_state = self
.id_to_subscription
.remove(&(id, chain_id))
.expect("Channels are inserted above; qed");

// Send the error back to frontend.
if sub_id
if subscription_id_state
.sub_id_sender
.send(Err(LightClientRpcError::Request(err.to_string())))
.is_err()
{
Expand Down Expand Up @@ -219,10 +252,11 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
"Cannot send method response to id={id} chain={chain_id:?}",
);
}
} else if let Some((sub_id_sender, _)) =
} else if let Some(subscription_id_state) =
self.id_to_subscription.remove(&(id, chain_id))
{
if sub_id_sender
if subscription_id_state
.sub_id_sender
.send(Err(LightClientRpcError::Request(error.to_string())))
.is_err()
{
Expand All @@ -247,7 +281,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
"Cannot send method response to id={id} chain={chain_id:?}",
);
}
} else if let Some((sub_id_sender, sender)) =
} else if let Some(pending_subscription) =
self.id_to_subscription.remove(&(id, chain_id))
jsdw marked this conversation as resolved.
Show resolved Hide resolved
{
let Ok(sub_id) = result
Expand All @@ -265,15 +299,19 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {

tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");

let (sub_id_sender, active_subscription) = pending_subscription.into_parts();
if sub_id_sender.send(Ok(result)).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id={id} chain={chain_id:?}",
);
} else {
// Track this subscription ID if send is successful.
self.subscriptions.insert((sub_id, chain_id), sender);

return;
}

// Track this subscription ID if send is successful.
self.subscriptions
.insert((sub_id, chain_id), active_subscription);
} else {
tracing::warn!(
target: LOG_TARGET,
Expand All @@ -287,22 +325,37 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
return;
};

if let Some(sender) = self.subscriptions.get_mut(&(id, chain_id)) {
// Send the current notification response.
if sender.send(result).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send notification to subscription id={id} chain={chain_id:?} method={method}",
);
let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else {
tracing::warn!(
target: LOG_TARGET,
"Subscription response id={id} chain={chain_id:?} method={method} is not tracked",
);
return;
};
if subscription_state.sender.send(result).is_ok() {
// Nothing else to do, user is informed about the notification.
return;
}

// Remove the sender if the subscription dropped the receiver.
self.subscriptions.remove(&(id, chain_id));
}
} else {
// User dropped the receiver, unsubscribe from the method and remove internal tracking.
let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have to check; is the only kind of error that can be returned from the channel above one that indicates it was closed by the receiver?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, taken from https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedSender.html#method.send:

If the receive half of the channel is closed, either due to close being called or the UnboundedReceiver having been dropped, this function returns an error. The error includes the value passed to send.

// State is checked to be some above, so this should never happen.
return;
};
// Make a call to unsubscribe from this method.
let unsub_id = self.next_id(chain_id);
let request = format!(
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
unsub_id, subscription_state.unsubscribe_method, id
);

if let Err(err) = self.client.json_rpc_request(request, chain_id) {
tracing::warn!(
target: LOG_TARGET,
"Subscription response id={id} chain={chain_id:?} is not tracked",
"Failed to unsubscribe id={id:?} chain={chain_id:?} method={:?} err={err:?}", subscription_state.unsubscribe_method
);
} else {
tracing::debug!(target: LOG_TARGET,"Unsubscribe id={id:?} chain={chain_id:?} method={:?}", subscription_state.unsubscribe_method);
}
}
Err(err) => {
Expand Down
2 changes: 2 additions & 0 deletions lightclient/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl LightClientRpc {
&self,
method: String,
params: String,
unsubscribe_method: String,
) -> Result<
(
oneshot::Receiver<MethodResponse>,
Expand All @@ -191,6 +192,7 @@ impl LightClientRpc {

self.to_backend.send(FromSubxt::Subscription {
method,
unsubscribe_method,
params,
sub_id,
sender,
Expand Down
4 changes: 2 additions & 2 deletions subxt/src/client/light_client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl RpcClientT for LightClientRpc {
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
_unsub: &'a str,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
let client = self.clone();
let chain_id = self.chain_id();
Expand All @@ -130,7 +130,7 @@ impl RpcClientT for LightClientRpc {
// Fails if the background is closed.
let (sub_id, notif) = client
.0
.subscription_request(sub.to_string(), params)
.subscription_request(sub.to_string(), params, unsub.to_string())
.map_err(|_| RpcError::ClientError(Box::new(LightClientError::BackgroundClosed)))?;

// Fails if the background is closed.
Expand Down
Loading