Skip to content

Commit

Permalink
cln_plugin: Support wildcard subscriptions
Browse files Browse the repository at this point in the history
Adapts `cln_plugin` to make it support wildcard `*`-subscriptions.
  • Loading branch information
ErikDeSmedt authored and endothermicdev committed Apr 30, 2024
1 parent 27eaa1e commit f1dc64b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 19 deletions.
68 changes: 50 additions & 18 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ where
option_values: HashMap<String, Option<options::Value>>,
rpcmethods: HashMap<String, RpcMethod<S>>,
subscriptions: HashMap<String, Subscription<S>>,
// Contains a Subscription if the user subscribed to "*"
wildcard_subscription : Option<Subscription<S>>,
notifications: Vec<NotificationTopic>,
custommessages: Vec<u16>,
featurebits: FeatureBits,
Expand All @@ -72,6 +74,7 @@ where
rpcmethods: HashMap<String, AsyncCallback<S>>,
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
wildcard_subscription : Option<AsyncNotificationCallback<S>>,
#[allow(dead_code)] // unsure why rust thinks this field isn't used
notifications: Vec<NotificationTopic>,
}
Expand All @@ -91,6 +94,7 @@ where
#[allow(dead_code)] // Unused until we fill in the Hook structs.
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,

This comment has been minimized.

wildcard_subscription : Option<AsyncNotificationCallback<S>>

This comment has been minimized.

Copy link
@gissellestarch
}

#[derive(Clone)]
Expand Down Expand Up @@ -123,6 +127,7 @@ where
output: Some(output),
hooks: HashMap::new(),
subscriptions: HashMap::new(),
wildcard_subscription: None,
options: HashMap::new(),
// Should not be configured by user.
// This values are set when parsing the init-call
Expand Down Expand Up @@ -173,12 +178,16 @@ where
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Result<(), Error>> + Send + 'static,
{
self.subscriptions.insert(
topic.to_string(),
Subscription {
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
},
);
let subscription = Subscription {
callback : Box::new(move |p, r| Box::pin(callback(p, r)))
};

if topic == "*" {
self.wildcard_subscription = Some(subscription);
}
else {
self.subscriptions.insert(topic.to_string(), subscription);
};
self
}

Expand Down Expand Up @@ -328,6 +337,7 @@ where

let subscriptions =
HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
let all_subscription = self.wildcard_subscription.map(|s| s.callback);

// Leave the `init` reply pending, so we can disable based on
// the options if required.
Expand All @@ -339,6 +349,7 @@ where
rpcmethods,
notifications: self.notifications,
subscriptions,
wildcard_subscription: all_subscription,
options: self.options,
option_values: self.option_values,
configuration,
Expand Down Expand Up @@ -378,9 +389,13 @@ where
})
.collect();

let subscriptions = self.subscriptions.keys()
.map(|s| s.clone())
.chain(self.wildcard_subscription.iter().map(|_| String::from("*"))).collect();

messages::GetManifestResponse {
options: self.options.values().cloned().collect(),
subscriptions: self.subscriptions.keys().map(|s| s.clone()).collect(),
subscriptions,
hooks: self.hooks.keys().map(|s| s.clone()).collect(),
rpcmethods,
notifications: self.notifications.clone(),
Expand Down Expand Up @@ -553,6 +568,7 @@ where
rpcmethods: self.rpcmethods,
hooks: self.hooks,
subscriptions: self.subscriptions,
wildcard_subscription : self.wildcard_subscription
};

output
Expand Down Expand Up @@ -724,25 +740,41 @@ where
Ok(())
}
messages::JsonRpc::CustomNotification(request) => {
// This code handles notifications
trace!("Dispatching custom notification {:?}", request);
let method = request
.get("method")
.context("Missing 'method' in request")?
.as_str()
.context("'method' is not a string")?;
let callback = self.subscriptions.get(method).with_context(|| {
anyhow!("No handler for notification '{}' registered", method)
})?;

let params = request
.get("params")
.context("Missing 'params' field in request")?
.clone();

let plugin = plugin.clone();
let call = callback(plugin.clone(), params);

tokio::spawn(async move { call.await.unwrap() });
Ok(())
.context("Missing 'params' field in request")?;

// Send to notification to the wildcard
// subscription "*" it it exists
match &self.wildcard_subscription {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move {call.await.unwrap()});}
None => {}
};

// Find the appropriate callback and process it
// We'll log a warning if no handler is defined
match self.subscriptions.get(method) {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move {call.await.unwrap()});
},
None => {
if self.wildcard_subscription.is_none() {
log::warn!("No handler for notification '{}' registered", method);
}
}
};
Ok(())
}
}
}
Expand Down
1 change: 0 additions & 1 deletion tests/test_cln_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ def test_grpc_decode(node_factory):
print(res)


@pytest.mark.xfail(strict=True)
def test_rust_plugin_subscribe_wildcard(node_factory):
""" Creates a plugin that loads the subscribe_wildcard plugin
"""
Expand Down

0 comments on commit f1dc64b

Please sign in to comment.