From 91b82efbf68719ecec76a4b8c337686116a2fec0 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 18 Sep 2024 08:52:04 +0100 Subject: [PATCH] add info and list command for subs --- Cargo.lock | 1 + crates/corro-admin/Cargo.toml | 3 +- crates/corro-admin/src/lib.rs | 67 ++++++++++++++++++++++++++++++++ crates/corro-types/src/pubsub.rs | 38 ++++++++++++++++++ crates/corro-types/src/sync.rs | 1 - crates/corrosion/src/main.rs | 30 ++++++++++++++ 6 files changed, 138 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7126aa9c..8646ab94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -741,6 +741,7 @@ dependencies = [ "tokio-util", "tracing", "tripwire", + "uuid", ] [[package]] diff --git a/crates/corro-admin/Cargo.toml b/crates/corro-admin/Cargo.toml index 19b5e72f..3d83ff8a 100644 --- a/crates/corro-admin/Cargo.toml +++ b/crates/corro-admin/Cargo.toml @@ -19,4 +19,5 @@ tokio-serde = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } tripwire = { path = "../tripwire" } -rangemap = { workspace = true } \ No newline at end of file +rangemap = { workspace = true } +uuid = { workspace = true } diff --git a/crates/corro-admin/src/lib.rs b/crates/corro-admin/src/lib.rs index 855f8652..f9cff2f8 100644 --- a/crates/corro-admin/src/lib.rs +++ b/crates/corro-admin/src/lib.rs @@ -27,6 +27,7 @@ use tokio_serde::{formats::Json, Framed}; use tokio_util::codec::LengthDelimitedCodec; use tracing::{debug, error, info, warn}; use tripwire::Tripwire; +use uuid::Uuid; #[derive(Debug, thiserror::Error)] pub enum AdminError { @@ -96,6 +97,7 @@ pub enum Command { Locks { top: usize }, Cluster(ClusterCommand), Actor(ActorCommand), + Subs(SubsCommand), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -104,6 +106,15 @@ pub enum SyncCommand { ReconcileGaps, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum SubsCommand { + Info { + hash: Option, + id: Option, + }, + List, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ClusterCommand { Rejoin, @@ -521,6 +532,62 @@ async fn handle_conn( send_success(&mut stream).await; } + Command::Subs(SubsCommand::List) => { + let handles = agent.subs_manager().get_handles(); + let uuid_to_hash = handles + .iter() + .map(|(k, v)| { + json!({ + "id": k, + "hash": v.hash(), + "sql": v.sql().lines().map(|c| c.trim()).collect::>().join(" "), + }) + }) + .collect::>(); + + send(&mut stream, Response::Json(serde_json::json!(uuid_to_hash))).await; + send_success(&mut stream).await; + } + Command::Subs(SubsCommand::Info { hash, id }) => { + let matcher_handle = match (hash, id) { + (Some(hash), _) => agent.subs_manager().get_by_hash(&hash), + (None, Some(id)) => agent.subs_manager().get(&id), + (None, None) => { + send_error(&mut stream, "specify hash or id for subscription").await; + continue; + } + }; + match matcher_handle { + Some(matcher) => { + let statements = matcher + .cached_stmts() + .iter() + .map(|(table, stmts)| { + json!({ + table: stmts.new_query(), + }) + }) + .collect::>(); + send( + &mut stream, + Response::Json(serde_json::json!({ + "id": matcher.id(), + "hash": matcher.hash(), + "path": matcher.subs_path(), + "last_change_id": matcher.last_change_id_sent(), + "original_query": matcher.sql().lines().map(|c| c.trim()).collect::>().join(" "), + "statements": statements, + })), + ) + .await; + send_success(&mut stream).await; + } + None => { + send_error(&mut stream, "unknown subscription hash or id").await; + continue; + } + }; + } }, Ok(None) => { debug!("done with admin conn"); diff --git a/crates/corro-types/src/pubsub.rs b/crates/corro-types/src/pubsub.rs index bd582a37..b34e7d91 100644 --- a/crates/corro-types/src/pubsub.rs +++ b/crates/corro-types/src/pubsub.rs @@ -74,6 +74,14 @@ impl SubsManager { self.0.read().get_by_query(sql) } + pub fn get_by_hash(&self, hash: &str) -> Option { + self.0.read().get_by_hash(hash) + } + + pub fn get_handles(&self) -> BTreeMap { + self.0.read().handles.clone() + } + pub fn get_or_insert( &self, sql: &str, @@ -328,6 +336,13 @@ impl InnerSubsManager { .and_then(|id| self.handles.get(id).cloned()) } + pub fn get_by_hash(&self, hash: &str) -> Option { + self.handles + .values() + .find(|x| x.inner.hash == hash) + .cloned() + } + fn remove(&mut self, id: &Uuid) -> Option { let handle = self.handles.remove(id)?; self.queries.remove(&handle.inner.sql); @@ -364,6 +379,9 @@ struct InnerMatcherHandle { cancel: CancellationToken, changes_tx: mpsc::Sender<(MatchCandidates, CrsqlDbVersion)>, last_change_rx: watch::Receiver, + // some state from the matcher so we can take a look later + subs_path: String, + cached_statements: HashMap, } type MatchCandidates = IndexMap>>; @@ -373,6 +391,10 @@ impl MatcherHandle { self.inner.id } + pub fn sql(&self) -> &String { + &self.inner.sql + } + pub fn hash(&self) -> &str { &self.inner.hash } @@ -385,6 +407,14 @@ impl MatcherHandle { &self.inner.col_names } + pub fn subs_path(&self) -> &String { + &self.inner.subs_path + } + + pub fn cached_stmts(&self) -> &HashMap { + &self.inner.cached_statements + } + pub async fn cleanup(self) { self.inner.cancel.cancel(); info!(sub_id = %self.inner.id, "Canceled subscription"); @@ -593,6 +623,12 @@ pub struct MatcherStmt { temp_query: String, } +impl MatcherStmt { + pub fn new_query(self: &Self) -> &String { + return &self.new_query; + } +} + const CHANGE_ID_COL: &str = "id"; const CHANGE_TYPE_COL: &str = "type"; @@ -819,6 +855,8 @@ impl Matcher { cancel: cancel.clone(), last_change_rx, changes_tx, + cached_statements: statements.clone(), + subs_path: sub_path.to_string(), }), state: state.clone(), }; diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 303dec61..5aa546f3 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -279,7 +279,6 @@ impl From for SyncMessage { } } - // generates a `SyncMessage` to tell another node what versions we're missing #[tracing::instrument(skip_all, level = "debug")] pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncStateV1 { diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index 8079f3ce..a17105c8 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -522,6 +522,19 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> { info!("Exited with code: {:?}", exit.code()); std::process::exit(exit.code().unwrap_or(1)); } + Command::Subs(SubsCommand::Info { hash, id }) => { + let mut conn = AdminConn::connect(cli.admin_path()).await?; + conn.send_command(corro_admin::Command::Subs(corro_admin::SubsCommand::Info { + hash: hash.clone(), + id: *id, + })) + .await?; + } + Command::Subs(SubsCommand::List) => { + let mut conn = AdminConn::connect(cli.admin_path()).await?; + conn.send_command(corro_admin::Command::Subs(corro_admin::SubsCommand::List)) + .await?; + } } Ok(()) @@ -686,6 +699,10 @@ enum Command { /// DB-related commands #[command(subcommand)] Db(DbCommand), + + /// Subscription related commands + #[command(subcommand)] + Subs(SubsCommand), } #[derive(Subcommand)] @@ -769,3 +786,16 @@ enum DbCommand { /// Acquires the lock on the DB Lock { cmd: String }, } + +#[derive(Subcommand)] +enum SubsCommand { + /// List all subscriptions on a node + List, + /// Get information on a subscription + Info { + #[arg(long)] + hash: Option, + #[arg(long)] + id: Option, + }, +}