Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make GRPC and WS urls for cosmos optional #513

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions hyperspace/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,8 @@ where

async fn finality_notifications(
&self,
) -> Result<
Pin<Box<dyn Stream<Item = <Self as IbcProvider>::FinalityEvent> + Send + Sync>>,
Error,
> {
let ws_client = self.rpc_client.clone();
) -> Result<Pin<Box<dyn Stream<Item = <Self as IbcProvider>::FinalityEvent> + Send + Sync>>, Error> {
let ws_client = self.rpc_ws_client().clone();
let subscription = ws_client
.subscribe(Query::from(EventType::NewBlock))
.await
Expand Down Expand Up @@ -148,7 +145,7 @@ where
// .and_eq("update_client.header", hex::encode(&update.header.unwrap_or_default()))
use tendermint::abci::Event as AbciEvent;

let mut client = ServiceClient::connect(self.grpc_url.to_string())
let mut client = ServiceClient::connect(self.grpc_url().to_string())
.await
.map_err(|e| Error::from(e.to_string()))?;
let mut resp = client
Expand Down Expand Up @@ -261,11 +258,12 @@ where
}

async fn reconnect(&mut self) -> anyhow::Result<()> {
let (rpc_client, ws_driver) = WebSocketClient::new(self.websocket_url.clone())
// TODO: don't reconnect if the url is not presented
let (rpc_client, ws_driver) = WebSocketClient::new(self.websocket_url().clone())
.await
.map_err(|e| Error::RpcError(format!("{e:?}")))?;
self.join_handles.lock().await.push(tokio::spawn(ws_driver.run()));
self.rpc_client = rpc_client;
self.rpc_ws_client = Some(rpc_client);
log::info!(target: "hyperspace_cosmos", "Reconnected to cosmos chain");
Ok(())
}
Expand Down
78 changes: 55 additions & 23 deletions hyperspace/cosmos/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,18 @@ pub struct MnemonicEntry {
pub struct CosmosClient<H> {
/// Chain name
pub name: String,
/// Chain rpc client
pub rpc_client: WebSocketClient,
/// Chain websocket rpc client
pub rpc_ws_client: Option<WebSocketClient>,
/// Chain http rpc client
pub rpc_http_client: HttpClient,
/// Reusable GRPC client
pub grpc_client: tonic::transport::Channel,
pub grpc_client: Option<tonic::transport::Channel>,
/// Chain rpc address
pub rpc_url: Url,
/// Chain grpc address
pub grpc_url: Url,
pub grpc_url: Option<Url>,
/// Websocket chain ws client
pub websocket_url: Url,
pub websocket_url: Option<Url>,
/// Chain Id
pub chain_id: ChainId,
/// Light client id on counterparty chain
Expand Down Expand Up @@ -186,9 +186,9 @@ pub struct CosmosClientConfig {
/// rpc url for cosmos
pub rpc_url: Url,
/// grpc url for cosmos
pub grpc_url: Url,
pub grpc_url: Option<Url>,
/// websocket url for cosmos
pub websocket_url: Url,
pub websocket_url: Option<Url>,
/// Cosmos chain Id
pub chain_id: String,
/// Light client id on counterparty chain
Expand Down Expand Up @@ -251,17 +251,32 @@ where
{
/// Initializes a [`CosmosClient`] given a [`CosmosClientConfig`]
pub async fn new(config: CosmosClientConfig) -> Result<Self, Error> {
let (rpc_client, rpc_driver) = WebSocketClient::new(config.websocket_url.clone())
.await
.map_err(|e| Error::RpcError(format!("failed to connect to Websocket {:?}", e)))?;
let mut rpc_client = None;

let mut join_handles = vec![];
if let Some(websocket_url) = &config.websocket_url {
let rpc_driver;
(rpc_client, rpc_driver) = WebSocketClient::new(websocket_url.clone())
.await
.map(|(x, y)| (Some(x), y))
.map_err(|e| Error::RpcError(format!("failed to connect to Websocket {:?}", e)))?;
join_handles.push(tokio::spawn(rpc_driver.run()));
} else {
log::warn!(target: "hyperspace_cosmos", "No websocket url provided for cosmos chain");
}
let rpc_http_client = HttpClient::new(config.rpc_url.clone())
.map_err(|e| Error::RpcError(format!("failed to connect to RPC {:?}", e)))?;
let ws_driver_jh = tokio::spawn(rpc_driver.run());
let grpc_client = tonic::transport::Endpoint::new(config.grpc_url.to_string())
.map_err(|e| Error::RpcError(format!("failed to create a GRPC endpoint {:?}", e)))?
.connect()
.await
.map_err(|e| Error::RpcError(format!("failed to connect to GRPC {:?}", e)))?;
let mut grpc_client = None;
if let Some(grpc_url) = &config.grpc_url {
grpc_client = tonic::transport::Endpoint::new(grpc_url.to_string())
.map_err(|e| Error::RpcError(format!("failed to connect to RPC {:?}", e)))?
.connect()
.await
.map(Some)
.map_err(|e| Error::RpcError(format!("failed to connect to RPC {:?}", e)))?;
} else {
log::warn!(target: "hyperspace_cosmos", "No grpc url provided for cosmos chain");
}

let chain_id = ChainId::from(config.chain_id);
let light_client =
Expand All @@ -279,7 +294,7 @@ where
Ok(Self {
name: config.name,
chain_id,
rpc_client,
rpc_ws_client: rpc_client,
rpc_http_client,
grpc_client,
rpc_url: config.rpc_url,
Expand Down Expand Up @@ -308,10 +323,26 @@ where
max_packets_to_process: config.common.max_packets_to_process as usize,
skip_tokens_list: config.skip_tokens_list.unwrap_or_default(),
},
join_handles: Arc::new(TokioMutex::new(vec![ws_driver_jh])),
join_handles: Arc::new(TokioMutex::new(join_handles)),
})
}

pub fn grpc_url(&self) -> Url {
self.grpc_url.clone().expect("grpc url is not set")
}

pub fn websocket_url(&self) -> Url {
self.websocket_url.clone().expect("rpc url is not set")
}

pub fn grpc_client(&self) -> &tonic::transport::Channel {
self.grpc_client.as_ref().expect("grpc client is not set")
}

pub fn rpc_ws_client(&self) -> WebSocketClient {
self.rpc_ws_client.as_ref().expect("rpc client is not set").clone()
}

pub fn client_id(&self) -> ClientId {
self.client_id
.lock()
Expand Down Expand Up @@ -366,16 +397,17 @@ where
)?;

// Simulate transaction
let res = simulate_tx(self.grpc_url.clone(), tx, tx_bytes.clone()).await?;
let res = simulate_tx(self.grpc_url(), tx, tx_bytes.clone()).await?;
res.result
.map(|r| log::debug!(target: "hyperspace_cosmos", "Simulated transaction: events: {:?}\nlogs: {}", r.events, r.log));

// Broadcast transaction
let hash = broadcast_tx(&self.rpc_client, tx_bytes).await?;
log::debug!(target: "hyperspace_cosmos", "🤝 Transaction sent with hash: {:?}", hash);
let client = &self.rpc_ws_client();
let hash = broadcast_tx(client, tx_bytes).await?;
log::info!(target: "hyperspace_cosmos", "🤝 Transaction sent with hash: {:?}", hash);

// wait for confirmation
confirm_tx(&self.rpc_client, hash).await
confirm_tx(client, hash).await
}

pub async fn fetch_light_block_with_cache(
Expand Down Expand Up @@ -460,7 +492,7 @@ where

/// Uses the GRPC client to retrieve the account sequence
pub async fn query_account(&self) -> Result<BaseAccount, Error> {
let mut client = QueryClient::connect(self.grpc_url.clone().to_string())
let mut client = QueryClient::connect(self.grpc_url().to_string())
.await
.map_err(|e| Error::from(format!("GRPC client error: {:?}", e)))?;

Expand Down
34 changes: 17 additions & 17 deletions hyperspace/cosmos/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ where
// necessary height field, as `height` is removed from `Attribute` from ibc-rs v0.22.0
async fn ibc_events(&self) -> Pin<Box<dyn Stream<Item = IbcEvent> + Send + 'static>> {
// Create websocket client. Like what `EventMonitor::subscribe()` does in `hermes`
let ws_client = self.rpc_client.clone();
let ws_client = self.rpc_ws_client();

let query_all = vec![
Query::from(EventType::NewBlock),
Expand Down Expand Up @@ -466,7 +466,7 @@ where
// Instead, we need to pull block height via `/abci_info` and then fetch block
// metadata at the given height via `/blockchain` endpoint.
let abci_info = self
.rpc_client
.rpc_http_client
.abci_info()
.await
.map_err(|e| Error::RpcError(format!("{e:?}")))?;
Expand All @@ -476,7 +476,7 @@ where
// TODO: Replace this query with `/header`, once it's available.
// https://github.com/informalsystems/tendermint-rs/pull/1101
let blocks = self
.rpc_client
.rpc_http_client
.blockchain(abci_info.last_block_height, abci_info.last_block_height)
.await
.map_err(|e| {
Expand Down Expand Up @@ -505,7 +505,7 @@ where
) -> Result<Vec<u64>, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(e.to_string()))?;
Expand Down Expand Up @@ -541,7 +541,7 @@ where
);
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(e.to_string()))?;
Expand Down Expand Up @@ -574,7 +574,7 @@ where
) -> Result<Vec<u64>, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(e.to_string()))?;
Expand Down Expand Up @@ -605,7 +605,7 @@ where
) -> Result<Vec<u64>, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(e.to_string()))?;
Expand Down Expand Up @@ -638,7 +638,7 @@ where
) -> Result<QueryChannelsResponse, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(format!("{e:?}")))?;
Expand Down Expand Up @@ -889,7 +889,7 @@ where
) -> Result<Vec<PrefixedCoin>, Self::Error> {
let denom = &asset_id;
let mut grpc_client = ibc_proto::cosmos::bank::v1beta1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(format!("{e:?}")))?;
Expand Down Expand Up @@ -956,7 +956,7 @@ where
let height = TmHeight::try_from(block_number)
.map_err(|e| Error::from(format!("Invalid block number: {e}")))?;
let response = self
.rpc_client
.rpc_ws_client()
.block(height)
.await
.map_err(|e| Error::RpcError(e.to_string()))?;
Expand All @@ -969,7 +969,7 @@ where
pagination: Some(PageRequest { limit: u32::MAX as _, ..Default::default() }),
});
let grpc_client = ibc_proto::ibc::core::client::v1::query_client::QueryClient::new(
self.grpc_client.clone(),
self.grpc_client().clone(),
);
let response = grpc_client
.clone()
Expand Down Expand Up @@ -998,7 +998,7 @@ where
});
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(format!("{e:?}")))?;
Expand All @@ -1025,7 +1025,7 @@ where
) -> Result<Vec<IdentifiedConnection>, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::connection::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(format!("{e:?}")))?;
Expand Down Expand Up @@ -1098,7 +1098,7 @@ where

let response: Response = loop {
let response = self
.rpc_client
.rpc_ws_client()
.tx_search(
Query::eq("tx.hash", tx_id.hash.to_string()),
false,
Expand Down Expand Up @@ -1165,7 +1165,7 @@ where

let response: Response = loop {
let response = self
.rpc_client
.rpc_ws_client()
.tx_search(
Query::eq("tx.hash", tx_id.hash.to_string()),
false,
Expand Down Expand Up @@ -1233,7 +1233,7 @@ where

let response: Response = loop {
let response = self
.rpc_client
.rpc_ws_client()
.tx_search(
Query::eq("tx.hash", tx_id.hash.to_string()),
false,
Expand Down Expand Up @@ -1318,7 +1318,7 @@ where
}
};
// let resp = MsgClient::connect(
// Endpoint::try_from(self.grpc_url.to_string())
// Endpoint::try_from(self.grpc_url().to_string())
// .map_err(|e| Error::from(format!("Failed to parse grpc url: {:?}", e)))?,
// )
// .await
Expand Down
2 changes: 1 addition & 1 deletion hyperspace/cosmos/src/test_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where

/// Returns a stream that yields chain Block number
async fn subscribe_blocks(&self) -> Pin<Box<dyn Stream<Item = u64> + Send + Sync>> {
let ws_client = self.rpc_client.clone();
let ws_client = self.rpc_ws_client();

let subscription = ws_client.subscribe(Query::from(EventType::NewBlock)).await.unwrap();
log::info!(target: "hyperspace_cosmos", "🛰️ Subscribed to {} listening to finality notifications", self.name);
Expand Down
Loading