diff --git a/hyperspace/cosmos/src/chain.rs b/hyperspace/cosmos/src/chain.rs index 34e2498f9..cc804b0ac 100644 --- a/hyperspace/cosmos/src/chain.rs +++ b/hyperspace/cosmos/src/chain.rs @@ -92,11 +92,8 @@ where async fn finality_notifications( &self, - ) -> Result< - Pin::FinalityEvent> + Send + Sync>>, - Error, - > { - let ws_client = self.rpc_client.clone(); + ) -> Result::FinalityEvent> + Send + Sync>>, Error> { + let ws_client = self.rpc_ws_client().clone(); let subscription = ws_client .subscribe(Query::from(EventType::NewBlock)) .await @@ -148,7 +145,7 @@ where // .and_eq("update_client.header", hex::encode(&update.header.unwrap_or_default())) use tendermint::abci::Event as AbciEvent; - let mut client = ServiceClient::connect(self.grpc_url.to_string()) + let mut client = ServiceClient::connect(self.grpc_url().to_string()) .await .map_err(|e| Error::from(e.to_string()))?; let mut resp = client @@ -261,11 +258,12 @@ where } async fn reconnect(&mut self) -> anyhow::Result<()> { - let (rpc_client, ws_driver) = WebSocketClient::new(self.websocket_url.clone()) + // TODO: don't reconnect if the url is not presented + let (rpc_client, ws_driver) = WebSocketClient::new(self.websocket_url().clone()) .await .map_err(|e| Error::RpcError(format!("{e:?}")))?; self.join_handles.lock().await.push(tokio::spawn(ws_driver.run())); - self.rpc_client = rpc_client; + self.rpc_ws_client = Some(rpc_client); log::info!(target: "hyperspace_cosmos", "Reconnected to cosmos chain"); Ok(()) } diff --git a/hyperspace/cosmos/src/client.rs b/hyperspace/cosmos/src/client.rs index 3c0344bbe..bfb0f5a52 100644 --- a/hyperspace/cosmos/src/client.rs +++ b/hyperspace/cosmos/src/client.rs @@ -129,18 +129,18 @@ pub struct MnemonicEntry { pub struct CosmosClient { /// Chain name pub name: String, - /// Chain rpc client - pub rpc_client: WebSocketClient, + /// Chain websocket rpc client + pub rpc_ws_client: Option, /// Chain http rpc client pub rpc_http_client: HttpClient, /// Reusable GRPC client - pub grpc_client: tonic::transport::Channel, + pub grpc_client: Option, /// Chain rpc address pub rpc_url: Url, /// Chain grpc address - pub grpc_url: Url, + pub grpc_url: Option, /// Websocket chain ws client - pub websocket_url: Url, + pub websocket_url: Option, /// Chain Id pub chain_id: ChainId, /// Light client id on counterparty chain @@ -186,9 +186,9 @@ pub struct CosmosClientConfig { /// rpc url for cosmos pub rpc_url: Url, /// grpc url for cosmos - pub grpc_url: Url, + pub grpc_url: Option, /// websocket url for cosmos - pub websocket_url: Url, + pub websocket_url: Option, /// Cosmos chain Id pub chain_id: String, /// Light client id on counterparty chain @@ -251,17 +251,32 @@ where { /// Initializes a [`CosmosClient`] given a [`CosmosClientConfig`] pub async fn new(config: CosmosClientConfig) -> Result { - let (rpc_client, rpc_driver) = WebSocketClient::new(config.websocket_url.clone()) - .await - .map_err(|e| Error::RpcError(format!("failed to connect to Websocket {:?}", e)))?; + let mut rpc_client = None; + + let mut join_handles = vec![]; + if let Some(websocket_url) = &config.websocket_url { + let rpc_driver; + (rpc_client, rpc_driver) = WebSocketClient::new(websocket_url.clone()) + .await + .map(|(x, y)| (Some(x), y)) + .map_err(|e| Error::RpcError(format!("failed to connect to Websocket {:?}", e)))?; + join_handles.push(tokio::spawn(rpc_driver.run())); + } else { + log::warn!(target: "hyperspace_cosmos", "No websocket url provided for cosmos chain"); + } let rpc_http_client = HttpClient::new(config.rpc_url.clone()) .map_err(|e| Error::RpcError(format!("failed to connect to RPC {:?}", e)))?; - let ws_driver_jh = tokio::spawn(rpc_driver.run()); - let grpc_client = tonic::transport::Endpoint::new(config.grpc_url.to_string()) - .map_err(|e| Error::RpcError(format!("failed to create a GRPC endpoint {:?}", e)))? - .connect() - .await - .map_err(|e| Error::RpcError(format!("failed to connect to GRPC {:?}", e)))?; + let mut grpc_client = None; + if let Some(grpc_url) = &config.grpc_url { + grpc_client = tonic::transport::Endpoint::new(grpc_url.to_string()) + .map_err(|e| Error::RpcError(format!("failed to connect to RPC {:?}", e)))? + .connect() + .await + .map(Some) + .map_err(|e| Error::RpcError(format!("failed to connect to RPC {:?}", e)))?; + } else { + log::warn!(target: "hyperspace_cosmos", "No grpc url provided for cosmos chain"); + } let chain_id = ChainId::from(config.chain_id); let light_client = @@ -279,7 +294,7 @@ where Ok(Self { name: config.name, chain_id, - rpc_client, + rpc_ws_client: rpc_client, rpc_http_client, grpc_client, rpc_url: config.rpc_url, @@ -308,10 +323,26 @@ where max_packets_to_process: config.common.max_packets_to_process as usize, skip_tokens_list: config.skip_tokens_list.unwrap_or_default(), }, - join_handles: Arc::new(TokioMutex::new(vec![ws_driver_jh])), + join_handles: Arc::new(TokioMutex::new(join_handles)), }) } + pub fn grpc_url(&self) -> Url { + self.grpc_url.clone().expect("grpc url is not set") + } + + pub fn websocket_url(&self) -> Url { + self.websocket_url.clone().expect("rpc url is not set") + } + + pub fn grpc_client(&self) -> &tonic::transport::Channel { + self.grpc_client.as_ref().expect("grpc client is not set") + } + + pub fn rpc_ws_client(&self) -> WebSocketClient { + self.rpc_ws_client.as_ref().expect("rpc client is not set").clone() + } + pub fn client_id(&self) -> ClientId { self.client_id .lock() @@ -366,16 +397,17 @@ where )?; // Simulate transaction - let res = simulate_tx(self.grpc_url.clone(), tx, tx_bytes.clone()).await?; + let res = simulate_tx(self.grpc_url(), tx, tx_bytes.clone()).await?; res.result .map(|r| log::debug!(target: "hyperspace_cosmos", "Simulated transaction: events: {:?}\nlogs: {}", r.events, r.log)); // Broadcast transaction - let hash = broadcast_tx(&self.rpc_client, tx_bytes).await?; - log::debug!(target: "hyperspace_cosmos", "🤝 Transaction sent with hash: {:?}", hash); + let client = &self.rpc_ws_client(); + let hash = broadcast_tx(client, tx_bytes).await?; + log::info!(target: "hyperspace_cosmos", "🤝 Transaction sent with hash: {:?}", hash); // wait for confirmation - confirm_tx(&self.rpc_client, hash).await + confirm_tx(client, hash).await } pub async fn fetch_light_block_with_cache( @@ -460,7 +492,7 @@ where /// Uses the GRPC client to retrieve the account sequence pub async fn query_account(&self) -> Result { - let mut client = QueryClient::connect(self.grpc_url.clone().to_string()) + let mut client = QueryClient::connect(self.grpc_url().to_string()) .await .map_err(|e| Error::from(format!("GRPC client error: {:?}", e)))?; diff --git a/hyperspace/cosmos/src/provider.rs b/hyperspace/cosmos/src/provider.rs index d3204ebbb..527be2fac 100644 --- a/hyperspace/cosmos/src/provider.rs +++ b/hyperspace/cosmos/src/provider.rs @@ -211,7 +211,7 @@ where // necessary height field, as `height` is removed from `Attribute` from ibc-rs v0.22.0 async fn ibc_events(&self) -> Pin + Send + 'static>> { // Create websocket client. Like what `EventMonitor::subscribe()` does in `hermes` - let ws_client = self.rpc_client.clone(); + let ws_client = self.rpc_ws_client(); let query_all = vec![ Query::from(EventType::NewBlock), @@ -466,7 +466,7 @@ where // Instead, we need to pull block height via `/abci_info` and then fetch block // metadata at the given height via `/blockchain` endpoint. let abci_info = self - .rpc_client + .rpc_http_client .abci_info() .await .map_err(|e| Error::RpcError(format!("{e:?}")))?; @@ -476,7 +476,7 @@ where // TODO: Replace this query with `/header`, once it's available. // https://github.com/informalsystems/tendermint-rs/pull/1101 let blocks = self - .rpc_client + .rpc_http_client .blockchain(abci_info.last_block_height, abci_info.last_block_height) .await .map_err(|e| { @@ -505,7 +505,7 @@ where ) -> Result, Self::Error> { let mut grpc_client = ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect( - self.grpc_url.clone().to_string(), + self.grpc_url().to_string(), ) .await .map_err(|e| Error::from(e.to_string()))?; @@ -541,7 +541,7 @@ where ); let mut grpc_client = ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect( - self.grpc_url.clone().to_string(), + self.grpc_url().to_string(), ) .await .map_err(|e| Error::from(e.to_string()))?; @@ -574,7 +574,7 @@ where ) -> Result, Self::Error> { let mut grpc_client = ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect( - self.grpc_url.clone().to_string(), + self.grpc_url().to_string(), ) .await .map_err(|e| Error::from(e.to_string()))?; @@ -605,7 +605,7 @@ where ) -> Result, Self::Error> { let mut grpc_client = ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect( - self.grpc_url.clone().to_string(), + self.grpc_url().to_string(), ) .await .map_err(|e| Error::from(e.to_string()))?; @@ -638,7 +638,7 @@ where ) -> Result { let mut grpc_client = ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect( - self.grpc_url.clone().to_string(), + self.grpc_url().to_string(), ) .await .map_err(|e| Error::from(format!("{e:?}")))?; @@ -889,7 +889,7 @@ where ) -> Result, Self::Error> { let denom = &asset_id; let mut grpc_client = ibc_proto::cosmos::bank::v1beta1::query_client::QueryClient::connect( - self.grpc_url.clone().to_string(), + self.grpc_url().to_string(), ) .await .map_err(|e| Error::from(format!("{e:?}")))?; @@ -956,7 +956,7 @@ where let height = TmHeight::try_from(block_number) .map_err(|e| Error::from(format!("Invalid block number: {e}")))?; let response = self - .rpc_client + .rpc_ws_client() .block(height) .await .map_err(|e| Error::RpcError(e.to_string()))?; @@ -969,7 +969,7 @@ where pagination: Some(PageRequest { limit: u32::MAX as _, ..Default::default() }), }); let grpc_client = ibc_proto::ibc::core::client::v1::query_client::QueryClient::new( - self.grpc_client.clone(), + self.grpc_client().clone(), ); let response = grpc_client .clone() @@ -998,7 +998,7 @@ where }); let mut grpc_client = ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect( - self.grpc_url.clone().to_string(), + self.grpc_url().to_string(), ) .await .map_err(|e| Error::from(format!("{e:?}")))?; @@ -1025,7 +1025,7 @@ where ) -> Result, Self::Error> { let mut grpc_client = ibc_proto::ibc::core::connection::v1::query_client::QueryClient::connect( - self.grpc_url.clone().to_string(), + self.grpc_url().to_string(), ) .await .map_err(|e| Error::from(format!("{e:?}")))?; @@ -1098,7 +1098,7 @@ where let response: Response = loop { let response = self - .rpc_client + .rpc_ws_client() .tx_search( Query::eq("tx.hash", tx_id.hash.to_string()), false, @@ -1165,7 +1165,7 @@ where let response: Response = loop { let response = self - .rpc_client + .rpc_ws_client() .tx_search( Query::eq("tx.hash", tx_id.hash.to_string()), false, @@ -1233,7 +1233,7 @@ where let response: Response = loop { let response = self - .rpc_client + .rpc_ws_client() .tx_search( Query::eq("tx.hash", tx_id.hash.to_string()), false, @@ -1318,7 +1318,7 @@ where } }; // let resp = MsgClient::connect( - // Endpoint::try_from(self.grpc_url.to_string()) + // Endpoint::try_from(self.grpc_url().to_string()) // .map_err(|e| Error::from(format!("Failed to parse grpc url: {:?}", e)))?, // ) // .await diff --git a/hyperspace/cosmos/src/test_provider.rs b/hyperspace/cosmos/src/test_provider.rs index e2cc6c8a1..035b3c804 100644 --- a/hyperspace/cosmos/src/test_provider.rs +++ b/hyperspace/cosmos/src/test_provider.rs @@ -37,7 +37,7 @@ where /// Returns a stream that yields chain Block number async fn subscribe_blocks(&self) -> Pin + Send + Sync>> { - let ws_client = self.rpc_client.clone(); + let ws_client = self.rpc_ws_client(); let subscription = ws_client.subscribe(Query::from(EventType::NewBlock)).await.unwrap(); log::info!(target: "hyperspace_cosmos", "🛰️ Subscribed to {} listening to finality notifications", self.name);