Skip to content

Commit

Permalink
Cosmos fixes and improvements; metric handling fix (#474)
Browse files Browse the repository at this point in the history
* Fix cosmos client next updates and improve some of the queries

* Return in `MetricsHandler::handle_events` if no new height was found
  • Loading branch information
vmarkushin authored Feb 9, 2024
1 parent 5b89480 commit 883cb03
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 54 deletions.
14 changes: 8 additions & 6 deletions hyperspace/cosmos/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,15 @@ where
pub async fn new(config: CosmosClientConfig) -> Result<Self, Error> {
let (rpc_client, rpc_driver) = WebSocketClient::new(config.websocket_url.clone())
.await
.map_err(|e| Error::RpcError(format!("{:?}", e)))?;
.map_err(|e| Error::RpcError(format!("failed to connect to Websocket {:?}", e)))?;
let rpc_http_client = HttpClient::new(config.rpc_url.clone())
.map_err(|e| Error::RpcError(format!("{:?}", e)))?;
.map_err(|e| Error::RpcError(format!("failed to connect to RPC {:?}", e)))?;
let ws_driver_jh = tokio::spawn(rpc_driver.run());
let grpc_client = tonic::transport::Endpoint::new(config.grpc_url.to_string())
.map_err(|e| Error::RpcError(format!("{:?}", e)))?
.map_err(|e| Error::RpcError(format!("failed to create a GRPC endpoint {:?}", e)))?
.connect()
.await
.map_err(|e| Error::RpcError(format!("{:?}", e)))?;
.map_err(|e| Error::RpcError(format!("failed to connect to GRPC {:?}", e)))?;

let chain_id = ChainId::from(config.chain_id);
let light_client =
Expand Down Expand Up @@ -401,15 +401,17 @@ where
to: TmHeight,
trusted_height: Height,
) -> Result<Vec<(Header, UpdateType)>, Error> {
let from = from.increment();
let mut xs = Vec::new();
let heightss = (from.value()..=to.value()).collect::<Vec<_>>();
let client = Arc::new(self.clone());
let to = self.rpc_call_delay().as_millis();
let delay_to = self.rpc_call_delay().as_millis();
for heights in heightss.chunks(5) {
let mut join_set = JoinSet::<Result<Result<_, Error>, Elapsed>>::new();
for height in heights.to_owned() {
let client = client.clone();
let duration = Duration::from_millis(rand::thread_rng().gen_range(0..to) as u64);
let duration =
Duration::from_millis(rand::thread_rng().gen_range(0..delay_to) as u64);
let fut = async move {
log::trace!(target: "hyperspace_cosmos", "Fetching header at height {:?}", height);
let latest_light_block =
Expand Down
116 changes: 68 additions & 48 deletions hyperspace/cosmos/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use ibc::{
identifier::{ChainId, ChannelId, ClientId, ConnectionId, PortId},
path::{
AcksPath, ChannelEndsPath, ClientConsensusStatePath, ClientStatePath,
CommitmentsPath, ConnectionsPath, Path, ReceiptsPath, SeqRecvsPath,
CommitmentsPath, ConnectionsPath, Path, ReceiptsPath, SeqRecvsPath, SeqSendsPath,
},
},
},
events::IbcEvent,
protobuf::Protobuf,
signer::Signer,
timestamp::Timestamp,
tx_msg::Msg,
Height,
Expand Down Expand Up @@ -82,7 +83,9 @@ use tendermint_rpc::{
};
use tokio::{task::JoinSet, time::sleep};

pub const NUMBER_OF_BLOCKS_TO_PROCESS_PER_ITER: u64 = 250;
// At least one *mandatory* update should happen during that period
// TODO: make it configurable
pub const NUMBER_OF_BLOCKS_TO_PROCESS_PER_ITER: u64 = 500;

#[derive(Clone, Debug)]
pub enum FinalityEvent {
Expand Down Expand Up @@ -141,7 +144,6 @@ where
let update_headers =
self.msg_update_client_header(from, to, client_state.latest_height).await?;
let mut block_events = Vec::new();
block_events.push((0, Vec::new()));
let mut join_set: JoinSet<Result<_, anyhow::Error>> = JoinSet::new();
let range = (from.value()..to.value()).collect::<Vec<_>>();
let to = self.rpc_call_delay().as_millis();
Expand Down Expand Up @@ -177,9 +179,15 @@ where
block_events.sort_by_key(|(height, _)| *height);

let mut updates = Vec::new();
for (events, (update_header, update_type)) in
block_events.into_iter().map(|(_, events)| events).zip(update_headers)
for (i, (events, (update_header, mut update_type))) in block_events
.into_iter()
.map(|(_, events)| events)
.zip(update_headers)
.enumerate()
{
if i == NUMBER_OF_BLOCKS_TO_PROCESS_PER_ITER as usize - 1 {
update_type = UpdateType::Mandatory;
}
let height = update_header.height();
let update_client_header = {
let msg = MsgUpdateAnyClient::<LocalClientTypes> {
Expand Down Expand Up @@ -231,17 +239,17 @@ where
let Event { data, events: _, query } = event.unwrap();
match data {
EventData::NewBlock { block, .. }
if query == Query::from(EventType::NewBlock).to_string() =>
{
let height = Height::new(
ChainId::chain_version(chain_id.to_string().as_str()),
u64::from(block.as_ref().ok_or("tx.height").unwrap().header.height),
);
events_with_height.push(IbcEventWithHeight::new(
ClientEvents::NewBlock::new(height).into(),
height,
));
},
if query == Query::from(EventType::NewBlock).to_string() =>
{
let height = Height::new(
ChainId::chain_version(chain_id.to_string().as_str()),
u64::from(block.as_ref().ok_or("tx.height").unwrap().header.height),
);
events_with_height.push(IbcEventWithHeight::new(
ClientEvents::NewBlock::new(height).into(),
height,
));
},
EventData::Tx { tx_result } => {
let height = Height::new(
ChainId::chain_version(chain_id.to_string().as_str()),
Expand All @@ -265,10 +273,10 @@ where
events_with_height
.push(IbcEventWithHeight::new(ibc_event, height));
} else {
log::debug!(target: "hyperspace_cosmos", "The event is unknown");
log::debug!(target: "hyperspace_cosmos", "the event is unknown");
}
} else {
log::debug!(target: "hyperspace_cosmos", "Failed to parse event {:?}", abci_event);
log::debug!(target: "hyperspace_cosmos", "Event wasn't parsed {:?}", abci_event);
}
}
},
Expand Down Expand Up @@ -313,6 +321,9 @@ where
Path::ClientState(ClientStatePath(client_id.clone())).to_string().into_bytes();
let (q, proof) = self.query_path(path_bytes.clone(), at, true).await?;
let client_state = Any::decode(&*q.value)?;
if client_state.type_url.is_empty() || client_state.value.is_empty() {
return Err(Error::Custom(format!("empty client state for height {at}")))
}
Ok(QueryClientStateResponse {
client_state: Some(client_state),
proof,
Expand Down Expand Up @@ -513,7 +524,6 @@ where

let commitment_sequences: Vec<u64> =
response.commitments.into_iter().map(|v| v.sequence).collect();

Ok(commitment_sequences)
}

Expand Down Expand Up @@ -807,52 +817,62 @@ where

fn expected_block_time(&self) -> Duration {
// cosmos chain block time is roughly 6-7 seconds
Duration::from_secs(7)
Duration::from_secs(5)
}

async fn query_client_update_time_and_height(
&self,
client_id: ClientId,
client_height: Height,
) -> Result<(Height, Timestamp), Self::Error> {
log::trace!(
log::debug!(
target: "hyperspace_cosmos",
"Querying client update time and height for client {:?} at height {:?}",
client_id,
client_height
);
let query_str = Query::eq("update_client.client_id", client_id.to_string())
let query_update = Query::eq("update_client.client_id", client_id.to_string())
.and_eq("update_client.consensus_height", client_height.to_string());
let query_create = Query::eq("create_client.client_id", client_id.to_string())
.and_eq("create_client.consensus_height", client_height.to_string());
for query_str in [query_update, query_create] {
let response = self
.rpc_http_client
.tx_search(
query_str,
true,
1,
1, // get only the first Tx matching the query
Order::Ascending,
)
.await
.map_err(|e| Error::RpcError(format!("{e:?}")))?;

let response = self
.rpc_http_client
.tx_search(
query_str,
true,
1,
1, // get only the first Tx matching the query
Order::Ascending,
)
.await
.map_err(|e| Error::RpcError(format!("{e:?}")))?;

for tx in response.txs {
for ev in &tx.tx_result.events {
let height = tx.height.value();
let ev =
ibc_event_try_from_abci_event(ev, Height::new(self.id().version(), height));
let timestamp = self.query_timestamp_at(height).await?;
match ev {
Ok(IbcEvent::UpdateClient(e)) if e.client_id() == &client_id =>
return Ok((
Height::new(self.chain_id.version(), height),
Timestamp::from_nanoseconds(timestamp)?,
)),
_ => (),
for tx in response.txs {
for ev in &tx.tx_result.events {
let height = tx.height.value();
let ev =
ibc_event_try_from_abci_event(ev, Height::new(self.id().version(), height));
let timestamp = self
.query_timestamp_at(height)
.await
.map_err(|e| Error::RpcError(format!("{e:?}")))?;
match ev {
Ok(IbcEvent::UpdateClient(e)) if e.client_id() == &client_id =>
return Ok((
Height::new(self.chain_id.version(), height),
Timestamp::from_nanoseconds(timestamp)?,
)),
Ok(IbcEvent::CreateClient(e)) if e.client_id() == &client_id =>
return Ok((
Height::new(self.chain_id.version(), height),
Timestamp::from_nanoseconds(timestamp)?,
)),
_ => (),
}
}
}
}

Err(Error::from("not found".to_string()))
}

Expand Down
3 changes: 3 additions & 0 deletions hyperspace/metrics/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ impl MetricsHandler {
_ => (),
}
}
if new_latest_processed_height == 0 {
return Ok(())
}
self.metrics.update_latest_processed_height(new_latest_processed_height)?;
Ok(())
}
Expand Down

0 comments on commit 883cb03

Please sign in to comment.