diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index f1ddeeeb0f039..53ebe81e8604d 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -545,6 +545,18 @@ impl Client { .await } + /// Execute runtime call at given block, provided the input and output types. + /// It also performs the input encode and output decode. + pub async fn typed_state_call( + &self, + method_name: String, + input: Input, + at_block: Option, + ) -> Result { + let encoded_output = self.state_call(method_name, Bytes(input.encode()), at_block).await?; + Output::decode(&mut &encoded_output.0[..]).map_err(Error::ResponseParseFailed) + } + /// Execute runtime call at given block. pub async fn state_call( &self, diff --git a/bridges/relays/lib-substrate-relay/src/messages_source.rs b/bridges/relays/lib-substrate-relay/src/messages_source.rs index 95a3feab0c11a..f032ec3c0c082 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_source.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_source.rs @@ -31,8 +31,8 @@ use async_std::sync::Arc; use async_trait::async_trait; use bp_messages::{ storage_keys::{operating_mode_key, outbound_lane_data_key}, - InboundMessageDetails, LaneId, MessageData, MessageNonce, MessagesOperatingMode, - OutboundLaneData, OutboundMessageDetails, UnrewardedRelayersState, + InboundMessageDetails, LaneId, MessageData, MessageNonce, MessagePayload, + MessagesOperatingMode, OutboundLaneData, OutboundMessageDetails, UnrewardedRelayersState, }; use bp_runtime::{messages::DispatchFeePayment, BasicOperatingMode, HeaderIdProvider}; use bridge_runtime_common::messages::{ @@ -56,12 +56,13 @@ use relay_substrate_client::{ use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use sp_core::{Bytes, Pair}; use sp_runtime::{traits::Header as HeaderT, DeserializeOwned}; -use std::{collections::HashMap, ops::RangeInclusive}; +use std::ops::RangeInclusive; /// Intermediate message proof returned by the source Substrate node. Includes everything /// required to submit to the target node: cumulative dispatch weight of bundled messages and /// the proof itself. pub type SubstrateMessagesProof = (Weight, FromBridgedChainMessagesProof>); +type MessagesToRefine<'a, Balance> = Vec<(MessagePayload, &'a mut OutboundMessageDetails)>; /// Substrate client as Substrate messages source. pub struct SubstrateMessagesSource { @@ -192,110 +193,97 @@ where &self, id: SourceHeaderIdOf>, nonces: RangeInclusive, - ) -> Result< - MessageDetailsMap< as MessageLane>::SourceChainBalance>, - SubstrateError, - > { - let encoded_response = self + ) -> Result>, SubstrateError> { + let mut out_msgs_details = self .source_client - .state_call( + .typed_state_call::<_, Vec<_>>( P::TargetChain::TO_CHAIN_MESSAGE_DETAILS_METHOD.into(), - Bytes((self.lane_id, nonces.start(), nonces.end()).encode()), + (self.lane_id, *nonces.start(), *nonces.end()), Some(id.1), ) .await?; - - let mut messages = make_message_details_map::( - Decode::decode(&mut &encoded_response.0[..]) - .map_err(SubstrateError::ResponseParseFailed)?, - nonces, - )?; + validate_out_msgs_details::(&out_msgs_details, nonces)?; // prepare arguments of the inbound message details call (if we need it) - let mut messages_to_refine = HashMap::new(); - for (message_nonce, message) in &messages { - if message.dispatch_fee_payment != DispatchFeePayment::AtTargetChain { + let mut msgs_to_refine = vec![]; + for out_msg_details in out_msgs_details.iter_mut() { + if out_msg_details.dispatch_fee_payment != DispatchFeePayment::AtTargetChain { continue } // for pay-at-target messages we may want to ask target chain for // refined dispatch weight - let message_key = bp_messages::storage_keys::message_key( + let msg_key = bp_messages::storage_keys::message_key( P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME, &self.lane_id, - *message_nonce, - ); - let message_data: MessageData> = - self.source_client.storage_value(message_key, Some(id.1)).await?.ok_or_else( - || { - SubstrateError::Custom(format!( - "Message to {} {:?}/{} is missing from runtime the storage of {} at {:?}", - P::TargetChain::NAME, - self.lane_id, - message_nonce, - P::SourceChain::NAME, - id, - )) - }, - )?; - let message_payload = message_data.payload; - messages_to_refine.insert( - *message_nonce, - ( - message_payload, - OutboundMessageDetails { - nonce: *message_nonce, - dispatch_weight: message.dispatch_weight, - size: message.size, - delivery_and_dispatch_fee: message.reward, - dispatch_fee_payment: DispatchFeePayment::AtTargetChain, - }, - ), + out_msg_details.nonce, ); + let msg_data: MessageData> = + self.source_client.storage_value(msg_key, Some(id.1)).await?.ok_or_else(|| { + SubstrateError::Custom(format!( + "Message to {} {:?}/{} is missing from runtime the storage of {} at {:?}", + P::TargetChain::NAME, + self.lane_id, + out_msg_details.nonce, + P::SourceChain::NAME, + id, + )) + })?; + + msgs_to_refine.push((msg_data.payload, out_msg_details)); } - // request inbound message details from the target client - if !messages_to_refine.is_empty() { - let refined_messages_encoded = self + for mut msgs_to_refine_batch in + split_msgs_to_refine::(self.lane_id, msgs_to_refine)? + { + let in_msgs_details = self .target_client - .state_call( + .typed_state_call::<_, Vec>( P::SourceChain::FROM_CHAIN_MESSAGE_DETAILS_METHOD.into(), - Bytes((self.lane_id, messages_to_refine.values().collect::>()).encode()), + (self.lane_id, &msgs_to_refine_batch), None, ) .await?; - let refined_messages = - Vec::::decode(&mut &refined_messages_encoded.0[..]) - .map_err(SubstrateError::ResponseParseFailed)?; - if refined_messages.len() != messages_to_refine.len() { + if in_msgs_details.len() != msgs_to_refine_batch.len() { return Err(SubstrateError::Custom(format!( "Call of {} at {} has returned {} entries instead of expected {}", P::SourceChain::FROM_CHAIN_MESSAGE_DETAILS_METHOD, P::TargetChain::NAME, - refined_messages.len(), - messages_to_refine.len(), + in_msgs_details.len(), + msgs_to_refine_batch.len(), ))) } - - for (nonce, refined_message) in messages_to_refine.keys().zip(refined_messages) { - let message = messages - .get_mut(nonce) - .expect("`messages_to_refine` is a subset of `messages`; qed"); + for ((_, out_msg_details), in_msg_details) in + msgs_to_refine_batch.iter_mut().zip(in_msgs_details) + { log::trace!( target: "bridge", "Refined weight of {}->{} message {:?}/{}: at-source: {}, at-target: {}", P::SourceChain::NAME, P::TargetChain::NAME, self.lane_id, - nonce, - message.dispatch_weight, - refined_message.dispatch_weight, + out_msg_details.nonce, + out_msg_details.dispatch_weight, + in_msg_details.dispatch_weight, ); - message.dispatch_weight = refined_message.dispatch_weight; + out_msg_details.dispatch_weight = in_msg_details.dispatch_weight; } } - Ok(messages) + let mut msgs_details_map = MessageDetailsMap::new(); + for out_msg_details in out_msgs_details { + msgs_details_map.insert( + out_msg_details.nonce, + MessageDetails { + dispatch_weight: out_msg_details.dispatch_weight, + size: out_msg_details.size as _, + reward: out_msg_details.delivery_and_dispatch_fee, + dispatch_fee_payment: out_msg_details.dispatch_fee_payment, + }, + ); + } + + Ok(msgs_details_map) } async fn prove_messages( @@ -571,10 +559,10 @@ where .unwrap_or(Err(SubstrateError::BridgePalletIsNotInitialized)) } -fn make_message_details_map( - weights: Vec>, +fn validate_out_msgs_details( + out_msgs_details: &[OutboundMessageDetails], nonces: RangeInclusive, -) -> Result, SubstrateError> { +) -> Result<(), SubstrateError> { let make_missing_nonce_error = |expected_nonce| { Err(SubstrateError::Custom(format!( "Missing nonce {} in message_details call result. Expected all nonces from {:?}", @@ -582,73 +570,88 @@ fn make_message_details_map( ))) }; - let mut weights_map = MessageDetailsMap::new(); - - // this is actually prevented by external logic - if nonces.is_empty() { - return Ok(weights_map) + if out_msgs_details.len() > nonces.clone().count() { + return Err(SubstrateError::Custom( + "More messages than requested returned by the message_details call.".into(), + )) } - // check if last nonce is missing - loop below is not checking this - let last_nonce_is_missing = - weights.last().map(|details| details.nonce != *nonces.end()).unwrap_or(true); - if last_nonce_is_missing { + // Check if last nonce is missing. The loop below is not checking this. + if out_msgs_details.is_empty() && !nonces.is_empty() { return make_missing_nonce_error(*nonces.end()) } - let mut expected_nonce = *nonces.start(); - let mut is_at_head = true; - - for details in weights { - match (details.nonce == expected_nonce, is_at_head) { - (true, _) => (), - (false, true) => { - // this may happen if some messages were already pruned from the source node - // - // this is not critical error and will be auto-resolved by messages lane (and target - // node) - log::info!( - target: "bridge", - "Some messages are missing from the {} node: {:?}. Target node may be out of sync?", - C::NAME, - expected_nonce..details.nonce, - ); - }, - (false, false) => { - // some nonces are missing from the middle/tail of the range - // - // this is critical error, because we can't miss any nonces - return make_missing_nonce_error(expected_nonce) - }, + let mut nonces_iter = nonces.clone().rev().peekable(); + let mut out_msgs_details_iter = out_msgs_details.iter().rev(); + while let Some((out_msg_details, &nonce)) = out_msgs_details_iter.next().zip(nonces_iter.peek()) + { + nonces_iter.next(); + if out_msg_details.nonce != nonce { + // Some nonces are missing from the middle/tail of the range. This is critical error. + return make_missing_nonce_error(nonce) } + } - weights_map.insert( - details.nonce, - MessageDetails { - dispatch_weight: details.dispatch_weight, - size: details.size as _, - reward: details.delivery_and_dispatch_fee, - dispatch_fee_payment: details.dispatch_fee_payment, - }, + // Check if some nonces from the beginning of the range are missing. This may happen if + // some messages were already pruned from the source node. This is not a critical error + // and will be auto-resolved by messages lane (and target node). + if nonces_iter.peek().is_some() { + log::info!( + target: "bridge", + "Some messages are missing from the {} node: {:?}. Target node may be out of sync?", + C::NAME, + nonces_iter.rev().collect::>(), ); - expected_nonce = details.nonce + 1; - is_at_head = false; } - Ok(weights_map) + Ok(()) +} + +fn split_msgs_to_refine( + lane_id: LaneId, + msgs_to_refine: MessagesToRefine, +) -> Result>, SubstrateError> { + let max_batch_size = Target::max_extrinsic_size() as usize; + let mut batches = vec![]; + + let mut current_msgs_batch = msgs_to_refine; + while !current_msgs_batch.is_empty() { + let mut next_msgs_batch = vec![]; + while (lane_id, ¤t_msgs_batch).encoded_size() > max_batch_size { + if current_msgs_batch.len() <= 1 { + return Err(SubstrateError::Custom(format!( + "Call of {} at {} can't be executed even if only one message is supplied. \ + max_extrinsic_size(): {}", + Source::FROM_CHAIN_MESSAGE_DETAILS_METHOD, + Target::NAME, + Target::max_extrinsic_size(), + ))) + } + + if let Some(msg) = current_msgs_batch.pop() { + next_msgs_batch.insert(0, msg); + } + } + + batches.push(current_msgs_batch); + current_msgs_batch = next_msgs_batch; + } + + Ok(batches) } #[cfg(test)] mod tests { use super::*; - use bp_runtime::messages::DispatchFeePayment; + use bp_runtime::{messages::DispatchFeePayment, Chain as ChainBase}; use codec::MaxEncodedLen; + use relay_rialto_client::Rialto; use relay_rococo_client::Rococo; use relay_wococo_client::Wococo; fn message_details_from_rpc( nonces: RangeInclusive, - ) -> Vec> { + ) -> Vec> { nonces .into_iter() .map(|nonce| bp_messages::OutboundMessageDetails { @@ -662,94 +665,49 @@ mod tests { } #[test] - fn make_message_details_map_succeeds_if_no_messages_are_missing() { - assert_eq!( - make_message_details_map::(message_details_from_rpc(1..=3), 1..=3,).unwrap(), - vec![ - ( - 1, - MessageDetails { - dispatch_weight: 0, - size: 0, - reward: 0, - dispatch_fee_payment: DispatchFeePayment::AtSourceChain, - } - ), - ( - 2, - MessageDetails { - dispatch_weight: 0, - size: 0, - reward: 0, - dispatch_fee_payment: DispatchFeePayment::AtSourceChain, - } - ), - ( - 3, - MessageDetails { - dispatch_weight: 0, - size: 0, - reward: 0, - dispatch_fee_payment: DispatchFeePayment::AtSourceChain, - } - ), - ] - .into_iter() - .collect(), + fn validate_out_msgs_details_succeeds_if_no_messages_are_missing() { + assert!( + validate_out_msgs_details::(&message_details_from_rpc(1..=3), 1..=3,).is_ok() ); } #[test] - fn make_message_details_map_succeeds_if_head_messages_are_missing() { - assert_eq!( - make_message_details_map::(message_details_from_rpc(2..=3), 1..=3,).unwrap(), - vec![ - ( - 2, - MessageDetails { - dispatch_weight: 0, - size: 0, - reward: 0, - dispatch_fee_payment: DispatchFeePayment::AtSourceChain, - } - ), - ( - 3, - MessageDetails { - dispatch_weight: 0, - size: 0, - reward: 0, - dispatch_fee_payment: DispatchFeePayment::AtSourceChain, - } - ), - ] - .into_iter() - .collect(), - ); + fn validate_out_msgs_details_succeeds_if_head_messages_are_missing() { + assert!( + validate_out_msgs_details::(&message_details_from_rpc(2..=3), 1..=3,).is_ok() + ) } #[test] - fn make_message_details_map_fails_if_mid_messages_are_missing() { + fn validate_out_msgs_details_fails_if_mid_messages_are_missing() { let mut message_details_from_rpc = message_details_from_rpc(1..=3); message_details_from_rpc.remove(1); assert!(matches!( - make_message_details_map::(message_details_from_rpc, 1..=3,), + validate_out_msgs_details::(&message_details_from_rpc, 1..=3,), + Err(SubstrateError::Custom(_)) + )); + } + + #[test] + fn validate_out_msgs_details_map_fails_if_tail_messages_are_missing() { + assert!(matches!( + validate_out_msgs_details::(&message_details_from_rpc(1..=2), 1..=3,), Err(SubstrateError::Custom(_)) )); } #[test] - fn make_message_details_map_fails_if_tail_messages_are_missing() { + fn validate_out_msgs_details_fails_if_all_messages_are_missing() { assert!(matches!( - make_message_details_map::(message_details_from_rpc(1..=2), 1..=3,), + validate_out_msgs_details::(&[], 1..=3), Err(SubstrateError::Custom(_)) )); } #[test] - fn make_message_details_map_fails_if_all_messages_are_missing() { + fn validate_out_msgs_details_fails_if_more_messages_than_nonces() { assert!(matches!( - make_message_details_map::(vec![], 1..=3), + validate_out_msgs_details::(&message_details_from_rpc(1..=5), 2..=5,), Err(SubstrateError::Custom(_)) )); } @@ -766,4 +724,101 @@ mod tests { dummy_proof.1.encode().len(), ); } + + fn check_split_msgs_to_refine( + payload_sizes: Vec, + expected_batches: Result, ()>, + ) { + let mut out_msgs_details = vec![]; + for (idx, _) in payload_sizes.iter().enumerate() { + out_msgs_details.push(OutboundMessageDetails::> { + nonce: idx as MessageNonce, + dispatch_weight: 0, + size: 0, + delivery_and_dispatch_fee: 0, + dispatch_fee_payment: DispatchFeePayment::AtTargetChain, + }); + } + + let mut msgs_to_refine = vec![]; + for (&payload_size, out_msg_details) in + payload_sizes.iter().zip(out_msgs_details.iter_mut()) + { + let payload = vec![1u8; payload_size]; + msgs_to_refine.push((payload, out_msg_details)); + } + + let maybe_batches = split_msgs_to_refine::([0, 0, 0, 0], msgs_to_refine); + match expected_batches { + Ok(expected_batches) => { + let batches = maybe_batches.unwrap(); + let mut idx = 0; + assert_eq!(batches.len(), expected_batches.len()); + for (batch, &expected_batch_size) in batches.iter().zip(expected_batches.iter()) { + assert_eq!(batch.len(), expected_batch_size); + for msg_to_refine in batch { + assert_eq!(msg_to_refine.0.len(), payload_sizes[idx]); + idx += 1; + } + } + }, + Err(_) => { + matches!(maybe_batches, Err(SubstrateError::Custom(_))); + }, + } + } + + #[test] + fn test_split_msgs_to_refine() { + let max_extrinsic_size = Rococo::max_extrinsic_size() as usize; + + // Check that an error is returned when one of the messages is too big. + check_split_msgs_to_refine(vec![max_extrinsic_size], Err(())); + check_split_msgs_to_refine(vec![50, 100, max_extrinsic_size, 200], Err(())); + + // Otherwise check that the split is valid. + check_split_msgs_to_refine(vec![100, 200, 300, 400], Ok(vec![4])); + check_split_msgs_to_refine( + vec![ + 50, + 100, + max_extrinsic_size - 500, + 500, + 1000, + 1500, + max_extrinsic_size - 3500, + 5000, + 10000, + ], + Ok(vec![3, 4, 2]), + ); + check_split_msgs_to_refine( + vec![ + 50, + 100, + max_extrinsic_size - 150, + 500, + 1000, + 1500, + max_extrinsic_size - 3000, + 5000, + 10000, + ], + Ok(vec![2, 1, 3, 1, 2]), + ); + check_split_msgs_to_refine( + vec![ + 5000, + 10000, + max_extrinsic_size - 3500, + 500, + 1000, + 1500, + max_extrinsic_size - 500, + 50, + 100, + ], + Ok(vec![2, 4, 3]), + ); + } }