Skip to content

Commit

Permalink
Make GRPC and WS urls for cosmos optional
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarkushin authored and Sventimir committed Aug 12, 2024
1 parent 063a708 commit 4624696
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 47 deletions.
9 changes: 5 additions & 4 deletions hyperspace/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
Pin<Box<dyn Stream<Item = <Self as IbcProvider>::FinalityEvent> + Send + Sync>>,
Error,
> {
let ws_client = self.rpc_client.clone();
let ws_client = self.rpc_ws_client.clone().ok_or("No RPC client!".to_string())?;
let subscription = ws_client
.subscribe(Query::from(EventType::NewBlock))
.await
Expand Down Expand Up @@ -148,7 +148,7 @@ where
// .and_eq("update_client.header", hex::encode(&update.header.unwrap_or_default()))
use tendermint::abci::Event as AbciEvent;

let mut client = ServiceClient::connect(self.grpc_url.to_string())
let mut client = ServiceClient::connect(self.grpc_url().to_string())
.await
.map_err(|e| Error::from(e.to_string()))?;
let mut resp = client
Expand Down Expand Up @@ -261,11 +261,12 @@ where
}

async fn reconnect(&mut self) -> anyhow::Result<()> {
let (rpc_client, ws_driver) = WebSocketClient::new(self.websocket_url.clone())
// TODO: don't reconnect if the url is not presented
let (rpc_client, ws_driver) = WebSocketClient::new(self.websocket_url().clone())
.await
.map_err(|e| Error::RpcError(format!("{e:?}")))?;
self.join_handles.lock().await.push(tokio::spawn(ws_driver.run()));
self.rpc_client = rpc_client;
self.rpc_ws_client = Some(rpc_client);
log::info!(target: "hyperspace_cosmos", "Reconnected to cosmos chain");
Ok(())
}
Expand Down
78 changes: 55 additions & 23 deletions hyperspace/cosmos/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,17 @@ pub struct CosmosClient<H> {
/// Chain name
pub name: String,
/// Chain rpc client
pub rpc_client: WebSocketClient,
pub rpc_ws_client: Option<WebSocketClient>,
/// Chain http rpc client
pub rpc_http_client: HttpClient,
/// Reusable GRPC client
pub grpc_client: tonic::transport::Channel,
pub grpc_client: Option<tonic::transport::Channel>,
/// Chain rpc address
pub rpc_url: Url,
/// Chain grpc address
pub grpc_url: Url,
pub grpc_url: Option<Url>,
/// Websocket chain ws client
pub websocket_url: Url,
pub websocket_url: Option<Url>,
/// Chain Id
pub chain_id: ChainId,
/// Light client id on counterparty chain
Expand Down Expand Up @@ -186,9 +186,9 @@ pub struct CosmosClientConfig {
/// rpc url for cosmos
pub rpc_url: Url,
/// grpc url for cosmos
pub grpc_url: Url,
pub grpc_url: Option<Url>,
/// websocket url for cosmos
pub websocket_url: Url,
pub websocket_url: Option<Url>,
/// Cosmos chain Id
pub chain_id: String,
/// Light client id on counterparty chain
Expand Down Expand Up @@ -251,17 +251,32 @@ where
{
/// Initializes a [`CosmosClient`] given a [`CosmosClientConfig`]
pub async fn new(config: CosmosClientConfig) -> Result<Self, Error> {
let (rpc_client, rpc_driver) = WebSocketClient::new(config.websocket_url.clone())
.await
.map_err(|e| Error::RpcError(format!("failed to connect to Websocket {:?}", e)))?;
let mut rpc_client = None;

let mut join_handles = vec![];
if let Some(websocket_url) = &config.websocket_url {
let rpc_driver;
(rpc_client, rpc_driver) = WebSocketClient::new(websocket_url.clone())
.await
.map(|(x, y)| (Some(x), y))
.map_err(|e| Error::RpcError(format!("{:?}", e)))?;
join_handles.push(tokio::spawn(rpc_driver.run()));
} else {
log::warn!(target: "hyperspace_cosmos", "No websocket url provided for cosmos chain");
}
let rpc_http_client = HttpClient::new(config.rpc_url.clone())
.map_err(|e| Error::RpcError(format!("failed to connect to RPC {:?}", e)))?;
let ws_driver_jh = tokio::spawn(rpc_driver.run());
let grpc_client = tonic::transport::Endpoint::new(config.grpc_url.to_string())
.map_err(|e| Error::RpcError(format!("failed to create a GRPC endpoint {:?}", e)))?
.connect()
.await
.map_err(|e| Error::RpcError(format!("failed to connect to GRPC {:?}", e)))?;
.map_err(|e| Error::RpcError(format!("{:?}", e)))?;
let mut grpc_client = None;
if let Some(grpc_url) = &config.grpc_url {
grpc_client = tonic::transport::Endpoint::new(grpc_url.to_string())
.map_err(|e| Error::RpcError(format!("{:?}", e)))?
.connect()
.await
.map(Some)
.map_err(|e| Error::RpcError(format!("{:?}", e)))?;
} else {
log::warn!(target: "hyperspace_cosmos", "No grpc url provided for cosmos chain");
}

let chain_id = ChainId::from(config.chain_id);
let light_client =
Expand All @@ -279,7 +294,7 @@ where
Ok(Self {
name: config.name,
chain_id,
rpc_client,
rpc_ws_client: rpc_client,
rpc_http_client,
grpc_client,
rpc_url: config.rpc_url,
Expand Down Expand Up @@ -308,10 +323,26 @@ where
max_packets_to_process: config.common.max_packets_to_process as usize,
skip_tokens_list: config.skip_tokens_list.unwrap_or_default(),
},
join_handles: Arc::new(TokioMutex::new(vec![ws_driver_jh])),
join_handles: Arc::new(TokioMutex::new(join_handles)),
})
}

pub fn grpc_url(&self) -> Url {
self.grpc_url.clone().expect("grpc url is not set")
}

pub fn websocket_url(&self) -> Url {
self.websocket_url.clone().expect("rpc url is not set")
}

pub fn grpc_client(&self) -> &tonic::transport::Channel {
self.grpc_client.as_ref().expect("grpc client is not set")
}

pub fn rpc_ws_client(&self) -> WebSocketClient {
self.rpc_ws_client.as_ref().expect("rpc client is not set").clone()
}

pub fn client_id(&self) -> ClientId {
self.client_id
.lock()
Expand Down Expand Up @@ -366,16 +397,17 @@ where
)?;

// Simulate transaction
let res = simulate_tx(self.grpc_url.clone(), tx, tx_bytes.clone()).await?;
let res = simulate_tx(self.grpc_url(), tx, tx_bytes.clone()).await?;
res.result
.map(|r| log::debug!(target: "hyperspace_cosmos", "Simulated transaction: events: {:?}\nlogs: {}", r.events, r.log));

// Broadcast transaction
let hash = broadcast_tx(&self.rpc_client, tx_bytes).await?;
log::debug!(target: "hyperspace_cosmos", "🤝 Transaction sent with hash: {:?}", hash);
let client = &self.rpc_ws_client();
let hash = broadcast_tx(client, tx_bytes).await?;
log::info!(target: "hyperspace_cosmos", "🤝 Transaction sent with hash: {:?}", hash);

// wait for confirmation
confirm_tx(&self.rpc_client, hash).await
confirm_tx(client, hash).await
}

pub async fn fetch_light_block_with_cache(
Expand Down Expand Up @@ -460,7 +492,7 @@ where

/// Uses the GRPC client to retrieve the account sequence
pub async fn query_account(&self) -> Result<BaseAccount, Error> {
let mut client = QueryClient::connect(self.grpc_url.clone().to_string())
let mut client = QueryClient::connect(self.grpc_url().to_string())
.await
.map_err(|e| Error::from(format!("GRPC client error: {:?}", e)))?;

Expand Down
34 changes: 17 additions & 17 deletions hyperspace/cosmos/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ where
// necessary height field, as `height` is removed from `Attribute` from ibc-rs v0.22.0
async fn ibc_events(&self) -> Pin<Box<dyn Stream<Item = IbcEvent> + Send + 'static>> {
// Create websocket client. Like what `EventMonitor::subscribe()` does in `hermes`
let ws_client = self.rpc_client.clone();
let ws_client = self.rpc_ws_client();

let query_all = vec![
Query::from(EventType::NewBlock),
Expand Down Expand Up @@ -466,7 +466,7 @@ where
// Instead, we need to pull block height via `/abci_info` and then fetch block
// metadata at the given height via `/blockchain` endpoint.
let abci_info = self
.rpc_client
.rpc_http_client
.abci_info()
.await
.map_err(|e| Error::RpcError(format!("{e:?}")))?;
Expand All @@ -476,7 +476,7 @@ where
// TODO: Replace this query with `/header`, once it's available.
// https://github.com/informalsystems/tendermint-rs/pull/1101
let blocks = self
.rpc_client
.rpc_http_client
.blockchain(abci_info.last_block_height, abci_info.last_block_height)
.await
.map_err(|e| {
Expand Down Expand Up @@ -505,7 +505,7 @@ where
) -> Result<Vec<u64>, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(e.to_string()))?;
Expand Down Expand Up @@ -541,7 +541,7 @@ where
);
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(e.to_string()))?;
Expand Down Expand Up @@ -574,7 +574,7 @@ where
) -> Result<Vec<u64>, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(e.to_string()))?;
Expand Down Expand Up @@ -605,7 +605,7 @@ where
) -> Result<Vec<u64>, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(e.to_string()))?;
Expand Down Expand Up @@ -638,7 +638,7 @@ where
) -> Result<QueryChannelsResponse, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(format!("{e:?}")))?;
Expand Down Expand Up @@ -889,7 +889,7 @@ where
) -> Result<Vec<PrefixedCoin>, Self::Error> {
let denom = &asset_id;
let mut grpc_client = ibc_proto::cosmos::bank::v1beta1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(format!("{e:?}")))?;
Expand Down Expand Up @@ -956,7 +956,7 @@ where
let height = TmHeight::try_from(block_number)
.map_err(|e| Error::from(format!("Invalid block number: {e}")))?;
let response = self
.rpc_client
.rpc_ws_client()
.block(height)
.await
.map_err(|e| Error::RpcError(e.to_string()))?;
Expand All @@ -969,7 +969,7 @@ where
pagination: Some(PageRequest { limit: u32::MAX as _, ..Default::default() }),
});
let grpc_client = ibc_proto::ibc::core::client::v1::query_client::QueryClient::new(
self.grpc_client.clone(),
self.grpc_client().clone(),
);
let response = grpc_client
.clone()
Expand Down Expand Up @@ -998,7 +998,7 @@ where
});
let mut grpc_client =
ibc_proto::ibc::core::channel::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(format!("{e:?}")))?;
Expand All @@ -1025,7 +1025,7 @@ where
) -> Result<Vec<IdentifiedConnection>, Self::Error> {
let mut grpc_client =
ibc_proto::ibc::core::connection::v1::query_client::QueryClient::connect(
self.grpc_url.clone().to_string(),
self.grpc_url().to_string(),
)
.await
.map_err(|e| Error::from(format!("{e:?}")))?;
Expand Down Expand Up @@ -1098,7 +1098,7 @@ where

let response: Response = loop {
let response = self
.rpc_client
.rpc_ws_client()
.tx_search(
Query::eq("tx.hash", tx_id.hash.to_string()),
false,
Expand Down Expand Up @@ -1165,7 +1165,7 @@ where

let response: Response = loop {
let response = self
.rpc_client
.rpc_ws_client()
.tx_search(
Query::eq("tx.hash", tx_id.hash.to_string()),
false,
Expand Down Expand Up @@ -1233,7 +1233,7 @@ where

let response: Response = loop {
let response = self
.rpc_client
.rpc_ws_client()
.tx_search(
Query::eq("tx.hash", tx_id.hash.to_string()),
false,
Expand Down Expand Up @@ -1318,7 +1318,7 @@ where
}
};
// let resp = MsgClient::connect(
// Endpoint::try_from(self.grpc_url.to_string())
// Endpoint::try_from(self.grpc_url().to_string())
// .map_err(|e| Error::from(format!("Failed to parse grpc url: {:?}", e)))?,
// )
// .await
Expand Down
2 changes: 1 addition & 1 deletion hyperspace/cosmos/src/test_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where

/// Returns a stream that yields chain Block number
async fn subscribe_blocks(&self) -> Pin<Box<dyn Stream<Item = u64> + Send + Sync>> {
let ws_client = self.rpc_client.clone();
let ws_client = self.rpc_ws_client();

let subscription = ws_client.subscribe(Query::from(EventType::NewBlock)).await.unwrap();
log::info!(target: "hyperspace_cosmos", "🛰️ Subscribed to {} listening to finality notifications", self.name);
Expand Down
4 changes: 2 additions & 2 deletions hyperspace/testsuite/tests/parachain_cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ async fn setup_clients() -> (AnyChain, AnyChain) {
let mut config_b = CosmosClientConfig {
name: "cosmos".to_string(),
rpc_url: args.chain_b.clone().parse().unwrap(),
grpc_url: args.cosmos_grpc.clone().parse().unwrap(),
websocket_url: args.cosmos_ws.clone().parse().unwrap(),
grpc_url: Some(args.cosmos_grpc.clone().parse().unwrap()),
websocket_url: Some(args.cosmos_ws.clone().parse().unwrap()),
chain_id: "ibcgo-1".to_string(),
client_id: None,
connection_id: None,
Expand Down

0 comments on commit 4624696

Please sign in to comment.