From 6da3aa179705c9c5a7567319f88e90a612734826 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 17 Nov 2020 23:27:30 +0300 Subject: [PATCH] Limit messages weight in batch (#496) * limit messages in the batch by weight/count * fixed components compilation * reverted obsolete parts of #469 * implement generated_messages_weights * actually use computed weight in message proof * fmt and clippy * fixed TODO * clippy * Update relays/messages-relay/src/message_race_loop.rs Co-authored-by: Hernando Castano * add issue reference * add assert message * grumbles * fmt * reexport weight from bp-message-lane Co-authored-by: Hernando Castano Co-authored-by: Hernando Castano --- Cargo.lock | 3 - bin/millau/node/src/service.rs | 1 - bin/millau/runtime/src/lib.rs | 19 +- bin/rialto/node/src/service.rs | 1 - bin/rialto/runtime/src/lib.rs | 19 +- modules/message-lane/rpc/Cargo.toml | 2 - modules/message-lane/rpc/src/lib.rs | 43 +- primitives/message-lane/src/lib.rs | 14 +- relays/messages-relay/Cargo.toml | 1 - relays/messages-relay/src/message_lane.rs | 16 - .../messages-relay/src/message_lane_loop.rs | 131 ++++-- .../src/message_race_delivery.rs | 337 ++++++++++++--- .../messages-relay/src/message_race_loop.rs | 90 ++-- .../src/message_race_receiving.rs | 70 +++- .../src/message_race_strategy.rs | 389 +++++++++++------- relays/messages-relay/src/metrics.rs | 17 +- relays/substrate-client/src/client.rs | 7 +- relays/substrate-client/src/error.rs | 3 + relays/substrate-client/src/rpc.rs | 3 +- relays/substrate/src/messages_source.rs | 56 ++- relays/substrate/src/messages_target.rs | 15 +- .../src/millau_messages_to_rialto.rs | 8 +- 22 files changed, 848 insertions(+), 397 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06ac73659bfc3..47d7c47c7bbbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3564,7 +3564,6 @@ dependencies = [ "futures 0.3.7", "hex", "log", - "num-traits", "parking_lot 0.11.0", "relay-utils", ] @@ -4203,13 +4202,11 @@ dependencies = [ "bp-message-lane", "bp-runtime", "derive_more", - "frame-support", "futures 0.3.7", "jsonrpc-core 15.1.0", "jsonrpc-core-client", "jsonrpc-derive", "sc-client-api", - "sp-api", "sp-blockchain", "sp-core", "sp-runtime", diff --git a/bin/millau/node/src/service.rs b/bin/millau/node/src/service.rs index 086ef7609cc1e..04ad152b37b07 100644 --- a/bin/millau/node/src/service.rs +++ b/bin/millau/node/src/service.rs @@ -221,7 +221,6 @@ pub fn new_full(config: Configuration) -> Result { finality_proof_provider.clone(), ))); io.extend_with(MessageLaneApi::to_delegate(MessageLaneRpcHandler::new( - client.clone(), backend.clone(), Arc::new(MillauMessageLaneKeys), ))); diff --git a/bin/millau/runtime/src/lib.rs b/bin/millau/runtime/src/lib.rs index e3154d382a294..7435788acc666 100644 --- a/bin/millau/runtime/src/lib.rs +++ b/bin/millau/runtime/src/lib.rs @@ -542,12 +542,19 @@ impl_runtime_apis! { // TODO: runtime should support several chains (https://github.com/paritytech/parity-bridges-common/issues/457) impl bp_message_lane::OutboundLaneApi for Runtime { - fn messages_dispatch_weight(lane: bp_message_lane::LaneId, begin: bp_message_lane::MessageNonce, end: bp_message_lane::MessageNonce) -> Weight { - (begin..=end) - .filter_map(|nonce| BridgeRialtoMessageLane::outbound_message_payload(lane, nonce)) - .filter_map(|encoded_payload| rialto_messages::ToRialtoMessagePayload::decode(&mut &encoded_payload[..]).ok()) - .map(|decoded_payload| decoded_payload.weight) - .fold(0, |sum, weight| sum.saturating_add(weight)) + fn messages_dispatch_weight( + lane: bp_message_lane::LaneId, + begin: bp_message_lane::MessageNonce, + end: bp_message_lane::MessageNonce, + ) -> Vec<(bp_message_lane::MessageNonce, Weight)> { + (begin..=end).filter_map(|nonce| { + let encoded_payload = BridgeRialtoMessageLane::outbound_message_payload(lane, nonce)?; + let decoded_payload = rialto_messages::ToRialtoMessagePayload::decode( + &mut &encoded_payload[..] + ).ok()?; + Some((nonce, decoded_payload.weight)) + }) + .collect() } fn latest_received_nonce(lane: bp_message_lane::LaneId) -> bp_message_lane::MessageNonce { diff --git a/bin/rialto/node/src/service.rs b/bin/rialto/node/src/service.rs index 1d7dfdffefbd3..1bb6e3b310fed 100644 --- a/bin/rialto/node/src/service.rs +++ b/bin/rialto/node/src/service.rs @@ -220,7 +220,6 @@ pub fn new_full(config: Configuration) -> Result { finality_proof_provider.clone(), ))); io.extend_with(MessageLaneApi::to_delegate(MessageLaneRpcHandler::new( - client.clone(), backend.clone(), Arc::new(RialtoMessageLaneKeys), ))); diff --git a/bin/rialto/runtime/src/lib.rs b/bin/rialto/runtime/src/lib.rs index 5953f054c1b17..4a3855f69e6d6 100644 --- a/bin/rialto/runtime/src/lib.rs +++ b/bin/rialto/runtime/src/lib.rs @@ -706,12 +706,19 @@ impl_runtime_apis! { // TODO: runtime should support several chains (https://github.com/paritytech/parity-bridges-common/issues/457) impl bp_message_lane::OutboundLaneApi for Runtime { - fn messages_dispatch_weight(lane: bp_message_lane::LaneId, begin: bp_message_lane::MessageNonce, end: bp_message_lane::MessageNonce) -> Weight { - (begin..=end) - .filter_map(|nonce| BridgeMillauMessageLane::outbound_message_payload(lane, nonce)) - .filter_map(|encoded_payload| millau_messages::ToMillauMessagePayload::decode(&mut &encoded_payload[..]).ok()) - .map(|decoded_payload| decoded_payload.weight) - .fold(0, |sum, weight| sum.saturating_add(weight)) + fn messages_dispatch_weight( + lane: bp_message_lane::LaneId, + begin: bp_message_lane::MessageNonce, + end: bp_message_lane::MessageNonce, + ) -> Vec<(bp_message_lane::MessageNonce, Weight)> { + (begin..=end).filter_map(|nonce| { + let encoded_payload = BridgeMillauMessageLane::outbound_message_payload(lane, nonce)?; + let decoded_payload = millau_messages::ToMillauMessagePayload::decode( + &mut &encoded_payload[..] + ).ok()?; + Some((nonce, decoded_payload.weight)) + }) + .collect() } fn latest_received_nonce(lane: bp_message_lane::LaneId) -> bp_message_lane::MessageNonce { diff --git a/modules/message-lane/rpc/Cargo.toml b/modules/message-lane/rpc/Cargo.toml index cc179f5d366c7..e9ef6c1cc6b2a 100644 --- a/modules/message-lane/rpc/Cargo.toml +++ b/modules/message-lane/rpc/Cargo.toml @@ -20,9 +20,7 @@ bp-message-lane = { path = "../../../primitives/message-lane" } # Substrate Dependencies -frame-support = "2.0" sc-client-api = "2.0" -sp-api = "2.0" sp-blockchain = "2.0" sp-core = "2.0" sp-runtime = "2.0" diff --git a/modules/message-lane/rpc/src/lib.rs b/modules/message-lane/rpc/src/lib.rs index bcc786f3866fc..54ac5eba1e51b 100644 --- a/modules/message-lane/rpc/src/lib.rs +++ b/modules/message-lane/rpc/src/lib.rs @@ -18,14 +18,12 @@ use crate::error::{Error, FutureResult}; -use bp_message_lane::{LaneId, MessageNonce, OutboundLaneApi}; +use bp_message_lane::{LaneId, MessageNonce}; use bp_runtime::InstanceId; -use frame_support::weights::Weight; use futures::{FutureExt, TryFutureExt}; use jsonrpc_core::futures::Future as _; use jsonrpc_derive::rpc; use sc_client_api::Backend as BackendT; -use sp_api::ProvideRuntimeApi; use sp_blockchain::{Error as BlockchainError, HeaderBackend}; use sp_core::{storage::StorageKey, Bytes}; use sp_runtime::{codec::Encode, generic::BlockId, traits::Block as BlockT}; @@ -56,8 +54,8 @@ pub trait Runtime: Send + Sync + 'static { /// Provides RPC methods for interacting with message-lane pallet. #[rpc] pub trait MessageLaneApi { - /// Returns cumulative dispatch weight of messages in given inclusive range and their storage proof. - /// The state of outbound lane is included in the proof if `include_outbound_lane_state` is true. + /// Returns storage proof of messages in given inclusive range. The state of outbound + /// lane is included in the proof if `include_outbound_lane_state` is true. #[rpc(name = "messageLane_proveMessages")] fn prove_messages( &self, @@ -67,7 +65,7 @@ pub trait MessageLaneApi { end: MessageNonce, include_outbound_lane_state: bool, block: Option, - ) -> FutureResult<(Weight, MessagesProof)>; + ) -> FutureResult; /// Returns proof-of-message(s) delivery. #[rpc(name = "messageLane_proveMessagesDelivery")] @@ -80,18 +78,16 @@ pub trait MessageLaneApi { } /// Implements the MessageLaneApi trait for interacting with message lanes. -pub struct MessageLaneRpcHandler { - client: Arc, +pub struct MessageLaneRpcHandler { backend: Arc, runtime: Arc, _phantom: std::marker::PhantomData, } -impl MessageLaneRpcHandler { +impl MessageLaneRpcHandler { /// Creates new mesage lane RPC handler. - pub fn new(client: Arc, backend: Arc, runtime: Arc) -> Self { + pub fn new(backend: Arc, runtime: Arc) -> Self { Self { - client, backend, runtime, _phantom: Default::default(), @@ -99,11 +95,9 @@ impl MessageLaneRpcHandler } } -impl MessageLaneApi for MessageLaneRpcHandler +impl MessageLaneApi for MessageLaneRpcHandler where Block: BlockT, - Client: ProvideRuntimeApi + Send + Sync + 'static, - Client::Api: OutboundLaneApi, Backend: BackendT + 'static, R: Runtime, { @@ -115,22 +109,7 @@ where end: MessageNonce, include_outbound_lane_state: bool, block: Option, - ) -> FutureResult<(Weight, MessagesProof)> { - let block = unwrap_or_best(&*self.backend, block); - - let messages_dispatch_weight_result = - self.client - .runtime_api() - .messages_dispatch_weight(&BlockId::Hash(block), lane, begin, end); - let messages_dispatch_weight = match messages_dispatch_weight_result { - Ok(messages_dispatch_weight) => messages_dispatch_weight, - Err(error) => { - return Box::new(jsonrpc_core::futures::future::err( - blockchain_err(BlockchainError::Execution(Box::new(format!("{:?}", error)))).into(), - )) - } - }; - + ) -> FutureResult { let runtime = self.runtime.clone(); let outbound_lane_data_key = if include_outbound_lane_state { Some(runtime.inbound_lane_data_key(&instance, &lane)) @@ -140,14 +119,14 @@ where Box::new( prove_keys_read( self.backend.clone(), - Some(block), + block, (begin..=end) .map(move |nonce| runtime.message_key(&instance, &lane, nonce)) .chain(outbound_lane_data_key.into_iter()), ) .boxed() .compat() - .map(move |proof| (messages_dispatch_weight, serialize_storage_proof(proof))) + .map(serialize_storage_proof) .map_err(Into::into), ) } diff --git a/primitives/message-lane/src/lib.rs b/primitives/message-lane/src/lib.rs index e7ae16135b6ed..760333a8cfcb8 100644 --- a/primitives/message-lane/src/lib.rs +++ b/primitives/message-lane/src/lib.rs @@ -23,13 +23,16 @@ #![allow(clippy::unnecessary_mut_passed)] use codec::{Decode, Encode}; -use frame_support::{weights::Weight, RuntimeDebug}; +use frame_support::RuntimeDebug; use sp_api::decl_runtime_apis; use sp_std::{collections::vec_deque::VecDeque, prelude::*}; pub mod source_chain; pub mod target_chain; +// Weight is reexported to avoid additional frame-support dependencies in message-lane related crates. +pub use frame_support::weights::Weight; + /// Lane identifier. pub type LaneId = [u8; 4]; @@ -127,7 +130,14 @@ decl_runtime_apis! { /// Outbound message lane API. pub trait OutboundLaneApi { /// Returns dispatch weight of all messages in given inclusive range. - fn messages_dispatch_weight(lane: LaneId, begin: MessageNonce, end: MessageNonce) -> Weight; + /// + /// If some (or all) messages are missing from the storage, they'll also will + /// be missing from the resulting vector. The vector is ordered by the nonce. + fn messages_dispatch_weight( + lane: LaneId, + begin: MessageNonce, + end: MessageNonce, + ) -> Vec<(MessageNonce, Weight)>; /// Returns nonce of the latest message, received by bridged chain. fn latest_received_nonce(lane: LaneId) -> MessageNonce; /// Returns nonce of the latest message, generated by given lane. diff --git a/relays/messages-relay/Cargo.toml b/relays/messages-relay/Cargo.toml index aa76679351514..9c2daefdb4271 100644 --- a/relays/messages-relay/Cargo.toml +++ b/relays/messages-relay/Cargo.toml @@ -11,7 +11,6 @@ async-trait = "0.1.40" futures = "0.3.5" hex = "0.4" log = "0.4.11" -num-traits = "0.2" parking_lot = "0.11.0" # Bridge Dependencies diff --git a/relays/messages-relay/src/message_lane.rs b/relays/messages-relay/src/message_lane.rs index 89840a6e8f0dd..5b4a6a97cc858 100644 --- a/relays/messages-relay/src/message_lane.rs +++ b/relays/messages-relay/src/message_lane.rs @@ -21,7 +21,6 @@ use relay_utils::HeaderId; -use num_traits::{CheckedSub, One, Zero}; use std::fmt::Debug; /// One-way message lane. @@ -31,21 +30,6 @@ pub trait MessageLane: Clone + Send + Sync { /// Name of the messages target. const TARGET_NAME: &'static str; - /// Message nonce type. - type MessageNonce: Clone - + Send - + Sync - + Copy - + Debug - + Default - + From - + Into - + Ord - + CheckedSub - + std::ops::Add - + One - + Zero; - /// Messages proof. type MessagesProof: Clone + Send + Sync; /// Messages receiving proof. diff --git a/relays/messages-relay/src/message_lane_loop.rs b/relays/messages-relay/src/message_lane_loop.rs index a9333509500e6..d0c4b46d8e226 100644 --- a/relays/messages-relay/src/message_lane_loop.rs +++ b/relays/messages-relay/src/message_lane_loop.rs @@ -30,18 +30,18 @@ use crate::message_race_receiving::run as run_message_receiving_race; use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; -use bp_message_lane::LaneId; +use bp_message_lane::{LaneId, MessageNonce, Weight}; use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt}; use relay_utils::{ interval, metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, process_future_result, retry_backoff, FailedClient, MaybeConnectionError, }; -use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; +use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; /// Message lane loop configuration params. #[derive(Debug, Clone)] -pub struct Params { +pub struct Params { /// Id of lane this loop is servicing. pub lane: LaneId, /// Interval at which we ask target node about its updates. @@ -52,10 +52,31 @@ pub struct Params { pub reconnect_delay: Duration, /// The loop will auto-restart if there has been no updates during this period. pub stall_timeout: Duration, + /// Message delivery race parameters. + pub delivery_params: MessageDeliveryParams, +} + +/// Message delivery race parameters. +#[derive(Debug, Clone)] +pub struct MessageDeliveryParams { /// Message delivery race will stop delivering messages if there are `max_unconfirmed_nonces_at_target` /// unconfirmed nonces on the target node. The race would continue once they're confirmed by the /// receiving race. pub max_unconfirmed_nonces_at_target: MessageNonce, + /// Maximal cumulative dispatch weight of relayed messages in single delivery transaction. + pub max_messages_weight_in_single_batch: Weight, +} + +/// Messages weights map. +pub type MessageWeightsMap = BTreeMap; + +/// Message delivery race proof parameters. +#[derive(Debug, PartialEq)] +pub struct MessageProofParameters { + /// Include outbound lane state proof? + pub outbound_state_proof_required: bool, + /// Cumulative dispatch weight of messages that we're building proof for. + pub dispatch_weight: Weight, } /// Source client trait. @@ -74,20 +95,27 @@ pub trait SourceClient: Clone + Send + Sync { async fn latest_generated_nonce( &self, id: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error>; + ) -> Result<(SourceHeaderIdOf

, MessageNonce), Self::Error>; /// Get nonce of the latest message, which receiving has been confirmed by the target chain. async fn latest_confirmed_received_nonce( &self, id: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error>; + ) -> Result<(SourceHeaderIdOf

, MessageNonce), Self::Error>; + + /// Returns mapping of message nonces, generated on this client, to their weights. + async fn generated_messages_weights( + &self, + id: SourceHeaderIdOf

, + nonces: RangeInclusive, + ) -> Result; /// Prove messages in inclusive range [begin; end]. async fn prove_messages( &self, id: SourceHeaderIdOf

, - nonces: RangeInclusive, - include_outbound_lane_state: bool, - ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error>; + nonces: RangeInclusive, + proof_parameters: MessageProofParameters, + ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error>; /// Submit messages receiving proof. async fn submit_messages_receiving_proof( @@ -113,13 +141,13 @@ pub trait TargetClient: Clone + Send + Sync { async fn latest_received_nonce( &self, id: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error>; + ) -> Result<(TargetHeaderIdOf

, MessageNonce), Self::Error>; /// Get nonce of latest confirmed message. async fn latest_confirmed_received_nonce( &self, id: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error>; + ) -> Result<(TargetHeaderIdOf

, MessageNonce), Self::Error>; /// Prove messages receiving at given block. async fn prove_messages_receiving( @@ -131,9 +159,9 @@ pub trait TargetClient: Clone + Send + Sync { async fn submit_messages_proof( &self, generated_at_header: SourceHeaderIdOf

, - nonces: RangeInclusive, + nonces: RangeInclusive, proof: P::MessagesProof, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; } /// State of the client. @@ -162,7 +190,7 @@ pub struct ClientsState { /// Run message lane service loop. pub fn run( - params: Params, + params: Params, mut source_client: impl SourceClient

, mut target_client: impl TargetClient

, metrics_params: Option, @@ -257,7 +285,7 @@ pub fn run( /// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. async fn run_until_connection_lost, TC: TargetClient

>( - params: Params, + params: Params, source_client: SC, target_client: TC, mut metrics_global: Option<&mut GlobalMetrics>, @@ -289,7 +317,7 @@ async fn run_until_connection_lost, TC: Targ delivery_target_state_receiver, params.stall_timeout, metrics_msg.clone(), - params.max_unconfirmed_nonces_at_target, + params.delivery_params, ) .fuse(); @@ -430,13 +458,15 @@ pub(crate) mod tests { use relay_utils::HeaderId; use std::sync::Arc; - pub fn header_id(number: TestSourceHeaderNumber) -> HeaderId { + pub fn header_id(number: TestSourceHeaderNumber) -> TestSourceHeaderId { HeaderId(number, number) } - pub type TestMessageNonce = u64; - pub type TestMessagesProof = (RangeInclusive, Option); - pub type TestMessagesReceivingProof = TestMessageNonce; + pub type TestSourceHeaderId = HeaderId; + pub type TestTargetHeaderId = HeaderId; + + pub type TestMessagesProof = (RangeInclusive, Option); + pub type TestMessagesReceivingProof = MessageNonce; pub type TestSourceHeaderNumber = u64; pub type TestSourceHeaderHash = u64; @@ -460,8 +490,6 @@ pub(crate) mod tests { const SOURCE_NAME: &'static str = "TestSource"; const TARGET_NAME: &'static str = "TestTarget"; - type MessageNonce = TestMessageNonce; - type MessagesProof = TestMessagesProof; type MessagesReceivingProof = TestMessagesReceivingProof; @@ -477,14 +505,14 @@ pub(crate) mod tests { is_source_fails: bool, is_source_reconnected: bool, source_state: SourceClientState, - source_latest_generated_nonce: TestMessageNonce, - source_latest_confirmed_received_nonce: TestMessageNonce, + source_latest_generated_nonce: MessageNonce, + source_latest_confirmed_received_nonce: MessageNonce, submitted_messages_receiving_proofs: Vec, is_target_fails: bool, is_target_reconnected: bool, target_state: SourceClientState, - target_latest_received_nonce: TestMessageNonce, - target_latest_confirmed_received_nonce: TestMessageNonce, + target_latest_received_nonce: MessageNonce, + target_latest_confirmed_received_nonce: MessageNonce, submitted_messages_proofs: Vec, } @@ -519,7 +547,7 @@ pub(crate) mod tests { async fn latest_generated_nonce( &self, id: SourceHeaderIdOf, - ) -> Result<(SourceHeaderIdOf, TestMessageNonce), Self::Error> { + ) -> Result<(SourceHeaderIdOf, MessageNonce), Self::Error> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_source_fails { @@ -531,21 +559,29 @@ pub(crate) mod tests { async fn latest_confirmed_received_nonce( &self, id: SourceHeaderIdOf, - ) -> Result<(SourceHeaderIdOf, TestMessageNonce), Self::Error> { + ) -> Result<(SourceHeaderIdOf, MessageNonce), Self::Error> { let mut data = self.data.lock(); (self.tick)(&mut *data); Ok((id, data.source_latest_confirmed_received_nonce)) } + async fn generated_messages_weights( + &self, + _id: SourceHeaderIdOf, + nonces: RangeInclusive, + ) -> Result { + Ok(nonces.map(|nonce| (nonce, 1)).collect()) + } + async fn prove_messages( &self, id: SourceHeaderIdOf, - nonces: RangeInclusive, - include_outbound_lane_state: bool, + nonces: RangeInclusive, + proof_parameters: MessageProofParameters, ) -> Result< ( SourceHeaderIdOf, - RangeInclusive, + RangeInclusive, TestMessagesProof, ), Self::Error, @@ -557,7 +593,7 @@ pub(crate) mod tests { nonces.clone(), ( nonces, - if include_outbound_lane_state { + if proof_parameters.outbound_state_proof_required { Some(data.source_latest_confirmed_received_nonce) } else { None @@ -610,7 +646,7 @@ pub(crate) mod tests { async fn latest_received_nonce( &self, id: TargetHeaderIdOf, - ) -> Result<(TargetHeaderIdOf, TestMessageNonce), Self::Error> { + ) -> Result<(TargetHeaderIdOf, MessageNonce), Self::Error> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { @@ -622,7 +658,7 @@ pub(crate) mod tests { async fn latest_confirmed_received_nonce( &self, id: TargetHeaderIdOf, - ) -> Result<(TargetHeaderIdOf, TestMessageNonce), Self::Error> { + ) -> Result<(TargetHeaderIdOf, MessageNonce), Self::Error> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { @@ -641,9 +677,9 @@ pub(crate) mod tests { async fn submit_messages_proof( &self, _generated_at_header: SourceHeaderIdOf, - nonces: RangeInclusive, + nonces: RangeInclusive, proof: TestMessagesProof, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { @@ -683,8 +719,11 @@ pub(crate) mod tests { source_tick: Duration::from_millis(100), target_tick: Duration::from_millis(100), reconnect_delay: Duration::from_millis(0), - stall_timeout: Duration::from_millis(60), - max_unconfirmed_nonces_at_target: 100, + stall_timeout: Duration::from_millis(60 * 1000), + delivery_params: MessageDeliveryParams { + max_unconfirmed_nonces_at_target: 4, + max_messages_weight_in_single_batch: 4, + }, }, source_client, target_client, @@ -743,8 +782,6 @@ pub(crate) mod tests { #[test] fn message_lane_loop_works() { - // with this configuration, target client must first sync headers [1; 10] and - // then submit proof-of-messages [0; 10] at once let (exit_sender, exit_receiver) = unbounded(); let result = run_loop_test( TestClientData { @@ -762,18 +799,22 @@ pub(crate) mod tests { }, Arc::new(|_: &mut TestClientData| {}), Arc::new(move |data: &mut TestClientData| { - // syncing source headers -> target chain (by one) + // syncing source headers -> target chain (all at once) if data.target_state.best_peer.0 < data.source_state.best_self.0 { - data.target_state.best_peer = - HeaderId(data.target_state.best_peer.0 + 1, data.target_state.best_peer.0 + 1); + data.target_state.best_peer = data.source_state.best_self; } - // syncing source headers -> target chain (all at once) + // syncing target headers -> source chain (all at once) if data.source_state.best_peer.0 < data.target_state.best_self.0 { data.source_state.best_peer = data.target_state.best_self; } - // if target has received all messages => increase target block so that confirmations may be sent - if data.target_latest_received_nonce == 10 { + // if target has received messages batch => increase blocks so that confirmations may be sent + if data.target_latest_received_nonce == 4 + || data.target_latest_received_nonce == 8 + || data.target_latest_received_nonce == 10 + { data.target_state.best_self = + HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.0 + 1); + data.source_state.best_self = HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.0 + 1); } // if source has received all messages receiving confirmations => increase source block so that confirmations may be sent diff --git a/relays/messages-relay/src/message_race_delivery.rs b/relays/messages-relay/src/message_race_delivery.rs index 90345f8c61571..370567542cc75 100644 --- a/relays/messages-relay/src/message_race_delivery.rs +++ b/relays/messages-relay/src/message_race_delivery.rs @@ -15,21 +15,21 @@ use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; use crate::message_lane_loop::{ - SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, - TargetClientState, + MessageDeliveryParams, MessageProofParameters, MessageWeightsMap, SourceClient as MessageLaneSourceClient, + SourceClientState, TargetClient as MessageLaneTargetClient, TargetClientState, +}; +use crate::message_race_loop::{ + MessageRace, NoncesRange, RaceState, RaceStrategy, SourceClient, SourceClientNonces, TargetClient, + TargetClientNonces, }; -use crate::message_race_loop::{ClientNonces, MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient}; use crate::message_race_strategy::BasicStrategy; use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; +use bp_message_lane::{MessageNonce, Weight}; use futures::stream::FusedStream; -use num_traits::CheckedSub; use relay_utils::FailedClient; -use std::{marker::PhantomData, ops::RangeInclusive, time::Duration}; - -/// Maximal number of messages to relay in single transaction. -const MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX: u32 = 4; +use std::{collections::BTreeMap, marker::PhantomData, ops::RangeInclusive, time::Duration}; /// Run message delivery race. pub async fn run( @@ -39,7 +39,7 @@ pub async fn run( target_state_updates: impl FusedStream>, stall_timeout: Duration, metrics_msg: Option, - max_unconfirmed_nonces_at_target: P::MessageNonce, + params: MessageDeliveryParams, ) -> Result<(), FailedClient> { crate::message_race_loop::run( MessageDeliveryRaceSource { @@ -56,10 +56,11 @@ pub async fn run( target_state_updates, stall_timeout, MessageDeliveryStrategy::

{ - max_unconfirmed_nonces_at_target, - source_nonces: None, + max_unconfirmed_nonces_at_target: params.max_unconfirmed_nonces_at_target, + max_messages_weight_in_single_batch: params.max_messages_weight_in_single_batch, + latest_confirmed_nonce_at_source: None, target_nonces: None, - strategy: BasicStrategy::new(MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX.into()), + strategy: BasicStrategy::new(), }, ) .await @@ -72,7 +73,7 @@ impl MessageRace for MessageDeliveryRace

{ type SourceHeaderId = SourceHeaderIdOf

; type TargetHeaderId = TargetHeaderIdOf

; - type MessageNonce = P::MessageNonce; + type MessageNonce = MessageNonce; type Proof = P::MessagesProof; fn source_name() -> String { @@ -98,12 +99,14 @@ where C: MessageLaneSourceClient

, { type Error = C::Error; - type ProofParameters = bool; + type NoncesRange = MessageWeightsMap; + type ProofParameters = MessageProofParameters; async fn nonces( &self, at_block: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, ClientNonces), Self::Error> { + prev_latest_nonce: MessageNonce, + ) -> Result<(SourceHeaderIdOf

, SourceClientNonces), Self::Error> { let (at_block, latest_generated_nonce) = self.client.latest_generated_nonce(at_block).await?; let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?; @@ -112,10 +115,18 @@ where metrics_msg.update_source_latest_confirmed_nonce::

(latest_confirmed_nonce); } + let new_nonces = if latest_generated_nonce > prev_latest_nonce { + self.client + .generated_messages_weights(at_block.clone(), prev_latest_nonce + 1..=latest_generated_nonce) + .await? + } else { + MessageWeightsMap::new() + }; + Ok(( at_block, - ClientNonces { - latest_nonce: latest_generated_nonce, + SourceClientNonces { + new_nonces, confirmed_nonce: Some(latest_confirmed_nonce), }, )) @@ -124,13 +135,10 @@ where async fn generate_proof( &self, at_block: SourceHeaderIdOf

, - nonces: RangeInclusive, + nonces: RangeInclusive, proof_parameters: Self::ProofParameters, - ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error> { - let outbound_state_proof_required = proof_parameters; - self.client - .prove_messages(at_block, nonces, outbound_state_proof_required) - .await + ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error> { + self.client.prove_messages(at_block, nonces, proof_parameters).await } } @@ -152,7 +160,7 @@ where async fn nonces( &self, at_block: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, ClientNonces), Self::Error> { + ) -> Result<(TargetHeaderIdOf

, TargetClientNonces), Self::Error> { let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?; let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?; @@ -163,7 +171,7 @@ where Ok(( at_block, - ClientNonces { + TargetClientNonces { latest_nonce: latest_received_nonce, confirmed_nonce: Some(latest_confirmed_nonce), }, @@ -173,9 +181,9 @@ where async fn submit_proof( &self, generated_at_block: SourceHeaderIdOf

, - nonces: RangeInclusive, + nonces: RangeInclusive, proof: P::MessagesProof, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { self.client .submit_messages_proof(generated_at_block, nonces, proof) .await @@ -185,11 +193,13 @@ where /// Messages delivery strategy. struct MessageDeliveryStrategy { /// Maximal unconfirmed nonces at target client. - max_unconfirmed_nonces_at_target: P::MessageNonce, - /// Latest nonces from the source client. - source_nonces: Option>, + max_unconfirmed_nonces_at_target: MessageNonce, + /// Maximal cumulative messages weight in the single delivery transaction. + max_messages_weight_in_single_batch: Weight, + /// Latest confirmed nonce at the source client. + latest_confirmed_nonce_at_source: Option, /// Target nonces from the source client. - target_nonces: Option>, + target_nonces: Option, /// Basic delivery strategy. strategy: MessageDeliveryStrategyBase

, } @@ -199,36 +209,41 @@ type MessageDeliveryStrategyBase

= BasicStrategy<

::SourceHeaderHash,

::TargetHeaderNumber,

::TargetHeaderHash, -

::MessageNonce, + MessageWeightsMap,

::MessagesProof, >; -impl RaceStrategy, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof> +impl RaceStrategy, TargetHeaderIdOf

, P::MessagesProof> for MessageDeliveryStrategy

{ - type ProofParameters = bool; + type SourceNoncesRange = MessageWeightsMap; + type ProofParameters = MessageProofParameters; fn is_empty(&self) -> bool { self.strategy.is_empty() } - fn best_at_source(&self) -> P::MessageNonce { + fn best_at_source(&self) -> MessageNonce { self.strategy.best_at_source() } - fn best_at_target(&self) -> P::MessageNonce { + fn best_at_target(&self) -> MessageNonce { self.strategy.best_at_target() } - fn source_nonces_updated(&mut self, at_block: SourceHeaderIdOf

, nonces: ClientNonces) { - self.source_nonces = Some(nonces.clone()); + fn source_nonces_updated( + &mut self, + at_block: SourceHeaderIdOf

, + nonces: SourceClientNonces, + ) { + self.latest_confirmed_nonce_at_source = nonces.confirmed_nonce; self.strategy.source_nonces_updated(at_block, nonces) } fn target_nonces_updated( &mut self, - nonces: ClientNonces, - race_state: &mut RaceState, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof>, + nonces: TargetClientNonces, + race_state: &mut RaceState, TargetHeaderIdOf

, P::MessagesProof>, ) { self.target_nonces = Some(nonces.clone()); self.strategy.target_nonces_updated(nonces, race_state) @@ -236,14 +251,14 @@ impl RaceStrategy, TargetHeaderIdOf

, P::M fn select_nonces_to_deliver( &mut self, - race_state: &RaceState, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof>, - ) -> Option<(RangeInclusive, Self::ProofParameters)> { + race_state: &RaceState, TargetHeaderIdOf

, P::MessagesProof>, + ) -> Option<(RangeInclusive, Self::ProofParameters)> { const CONFIRMED_NONCE_PROOF: &str = "\ ClientNonces are crafted by MessageDeliveryRace(Source|Target);\ MessageDeliveryRace(Source|Target) always fills confirmed_nonce field;\ qed"; - let source_nonces = self.source_nonces.as_ref()?; + let latest_confirmed_nonce_at_source = self.latest_confirmed_nonce_at_source?; let target_nonces = self.target_nonces.as_ref()?; // There's additional condition in the message delivery race: target would reject messages @@ -257,10 +272,9 @@ impl RaceStrategy, TargetHeaderIdOf

, P::M // The receiving race is responsible to deliver confirmations back to the source chain. So if // there's a lot of unconfirmed messages, let's wait until it'll be able to do its job. let latest_received_nonce_at_target = target_nonces.latest_nonce; - let latest_confirmed_nonce_at_source = source_nonces.confirmed_nonce.expect(CONFIRMED_NONCE_PROOF); - let confirmations_missing = latest_received_nonce_at_target.checked_sub(&latest_confirmed_nonce_at_source); + let confirmations_missing = latest_received_nonce_at_target.checked_sub(latest_confirmed_nonce_at_source); match confirmations_missing { - Some(confirmations_missing) if confirmations_missing > self.max_unconfirmed_nonces_at_target => { + Some(confirmations_missing) if confirmations_missing >= self.max_unconfirmed_nonces_at_target => { log::debug!( target: "bridge", "Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \ @@ -277,11 +291,7 @@ impl RaceStrategy, TargetHeaderIdOf

, P::M _ => (), } - // If we're here, then the confirmations race did it job && sending side now knows that messages - // have been delivered. Now let's select nonces that we want to deliver. - let selected_nonces = self.strategy.select_nonces_to_deliver(race_state)?.0; - - // Ok - we have new nonces to deliver. But target may still reject new messages, because we haven't + // Ok - we may have new nonces to deliver. But target may still reject new messages, because we haven't // notified it that (some) messages have been confirmed. So we may want to include updated // `source.latest_confirmed` in the proof. // @@ -290,11 +300,226 @@ impl RaceStrategy, TargetHeaderIdOf

, P::M let latest_confirmed_nonce_at_target = target_nonces.confirmed_nonce.expect(CONFIRMED_NONCE_PROOF); let outbound_state_proof_required = latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source; - // https://github.com/paritytech/parity-bridges-common/issues/432 - // https://github.com/paritytech/parity-bridges-common/issues/433 - // TODO: number of messages must be no larger than: - // `max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - latest_confirmed_nonce_at_target)` + // If we're here, then the confirmations race did its job && sending side now knows that messages + // have been delivered. Now let's select nonces that we want to deliver. + // + // We may deliver at most: + // + // max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - latest_confirmed_nonce_at_target) + // + // messages in the batch. But since we're including outbound state proof in the batch, then it + // may be increased to: + // + // max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - latest_confirmed_nonce_at_source) + let future_confirmed_nonce_at_target = if outbound_state_proof_required { + latest_confirmed_nonce_at_source + } else { + latest_confirmed_nonce_at_target + }; + let max_nonces = latest_received_nonce_at_target + .checked_sub(future_confirmed_nonce_at_target) + .and_then(|diff| self.max_unconfirmed_nonces_at_target.checked_sub(diff)) + .unwrap_or_default(); + let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch; + let mut selected_weight: Weight = 0; + let mut selected_count: MessageNonce = 0; + + let selected_nonces = self + .strategy + .select_nonces_to_deliver_with_selector(race_state, |range| { + let to_requeue = range + .into_iter() + .skip_while(|(_, weight)| { + // limit messages in the batch by weight + let new_selected_weight = match selected_weight.checked_add(*weight) { + Some(new_selected_weight) if new_selected_weight <= max_messages_weight_in_single_batch => { + new_selected_weight + } + _ => return false, + }; + + // limit number of messages in the batch + let new_selected_count = selected_count + 1; + if new_selected_count > max_nonces { + return false; + } + + selected_weight = new_selected_weight; + selected_count = new_selected_count; + true + }) + .collect::>(); + if to_requeue.is_empty() { + None + } else { + Some(to_requeue) + } + })?; + + Some(( + selected_nonces, + MessageProofParameters { + outbound_state_proof_required, + dispatch_weight: selected_weight, + }, + )) + } +} + +impl NoncesRange for MessageWeightsMap { + fn begin(&self) -> MessageNonce { + self.keys().next().cloned().unwrap_or_default() + } + + fn end(&self) -> MessageNonce { + self.keys().next_back().cloned().unwrap_or_default() + } + + fn greater_than(mut self, nonce: MessageNonce) -> Option { + let gte = self.split_off(&(nonce + 1)); + if gte.is_empty() { + None + } else { + Some(gte) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::message_lane_loop::{ + tests::{header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderId, TestTargetHeaderId}, + ClientState, + }; + + type TestRaceState = RaceState; + type TestStrategy = MessageDeliveryStrategy; + + fn prepare_strategy() -> (TestRaceState, TestStrategy) { + let mut race_state = RaceState { + source_state: Some(ClientState { + best_self: header_id(1), + best_peer: header_id(1), + }), + target_state: Some(ClientState { + best_self: header_id(1), + best_peer: header_id(1), + }), + nonces_to_submit: None, + nonces_submitted: None, + }; + + let mut race_strategy = TestStrategy { + max_unconfirmed_nonces_at_target: 4, + max_messages_weight_in_single_batch: 4, + latest_confirmed_nonce_at_source: Some(19), + target_nonces: Some(TargetClientNonces { + latest_nonce: 19, + confirmed_nonce: Some(19), + }), + strategy: BasicStrategy::new(), + }; + + race_strategy.strategy.source_nonces_updated( + header_id(1), + SourceClientNonces { + new_nonces: vec![(20, 1), (21, 1), (22, 1), (23, 1)].into_iter().collect(), + confirmed_nonce: Some(19), + }, + ); + race_strategy + .strategy + .target_nonces_updated(race_strategy.target_nonces.clone().unwrap(), &mut race_state); + + (race_state, race_strategy) + } + + fn proof_parameters(state_required: bool, weight: Weight) -> MessageProofParameters { + MessageProofParameters { + outbound_state_proof_required: state_required, + dispatch_weight: weight, + } + } + + #[test] + fn weights_map_works_as_nonces_range() { + fn build_map(range: RangeInclusive) -> MessageWeightsMap { + range.map(|idx| (idx, idx)).collect() + } + + let map = build_map(20..=30); + + assert_eq!(map.begin(), 20); + assert_eq!(map.end(), 30); + assert_eq!(map.clone().greater_than(10), Some(build_map(20..=30))); + assert_eq!(map.clone().greater_than(19), Some(build_map(20..=30))); + assert_eq!(map.clone().greater_than(20), Some(build_map(21..=30))); + assert_eq!(map.clone().greater_than(25), Some(build_map(26..=30))); + assert_eq!(map.clone().greater_than(29), Some(build_map(30..=30))); + assert_eq!(map.greater_than(30), None); + } + + #[test] + fn message_delivery_strategy_selects_messages_to_deliver() { + let (state, mut strategy) = prepare_strategy(); + + // both sides are ready to relay new messages + assert_eq!( + strategy.select_nonces_to_deliver(&state), + Some(((20..=23), proof_parameters(false, 4))) + ); + } + + #[test] + fn message_delivery_strategy_selects_nothing_if_too_many_confirmations_missing() { + let (state, mut strategy) = prepare_strategy(); + + // if there are already `max_unconfirmed_nonces_at_target` messages on target, + // we need to wait until confirmations will be delivered by receiving race + strategy.latest_confirmed_nonce_at_source = + Some(strategy.target_nonces.as_ref().unwrap().latest_nonce - strategy.max_unconfirmed_nonces_at_target); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + } + + #[test] + fn message_delivery_strategy_includes_outbound_state_proof_when_new_nonces_are_available() { + let (state, mut strategy) = prepare_strategy(); + + // if there are new confirmed nonces on source, we want to relay this information + // to target to prune rewards queue + let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap(); + strategy.target_nonces.as_mut().unwrap().confirmed_nonce = Some(prev_confirmed_nonce_at_source - 1); + assert_eq!( + strategy.select_nonces_to_deliver(&state), + Some(((20..=23), proof_parameters(true, 4))) + ); + } + + #[test] + fn message_delivery_strategy_limits_batch_by_messages_weight() { + let (state, mut strategy) = prepare_strategy(); + + // not all queued messages may fit in the batch, because batch has max weight + strategy.max_messages_weight_in_single_batch = 3; + assert_eq!( + strategy.select_nonces_to_deliver(&state), + Some(((20..=22), proof_parameters(false, 3))) + ); + } - Some((selected_nonces, outbound_state_proof_required)) + #[test] + fn message_delivery_strategy_limits_batch_by_messages_count() { + let (state, mut strategy) = prepare_strategy(); + + // 1 delivery confirmation from target to source is still missing, so we may only + // relay 3 new messages + let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap(); + strategy.latest_confirmed_nonce_at_source = Some(prev_confirmed_nonce_at_source - 1); + strategy.target_nonces.as_mut().unwrap().confirmed_nonce = Some(prev_confirmed_nonce_at_source - 1); + assert_eq!( + strategy.select_nonces_to_deliver(&state), + Some(((20..=22), proof_parameters(false, 3))) + ); } } diff --git a/relays/messages-relay/src/message_race_loop.rs b/relays/messages-relay/src/message_race_loop.rs index 43a2cd9ad275b..8fa8d5ffba36a 100644 --- a/relays/messages-relay/src/message_race_loop.rs +++ b/relays/messages-relay/src/message_race_loop.rs @@ -23,6 +23,7 @@ use crate::message_lane_loop::ClientState; use async_trait::async_trait; +use bp_message_lane::MessageNonce; use futures::{ future::FutureExt, stream::{FusedStream, StreamExt}, @@ -58,10 +59,32 @@ type SourceClientState

= ClientState<

::SourceHeaderId,

= ClientState<

::TargetHeaderId,

::SourceHeaderId>; -/// Nonces on the race client. +/// Inclusive nonces range. +pub trait NoncesRange: Debug + Sized { + /// Get begin of the range. + fn begin(&self) -> MessageNonce; + /// Get end of the range. + fn end(&self) -> MessageNonce; + /// Returns new range with current range nonces that are greater than the passed `nonce`. + /// If there are no such nonces, `None` is returned. + fn greater_than(self, nonce: MessageNonce) -> Option; +} + +/// Nonces on the race source client. #[derive(Debug, Clone)] -pub struct ClientNonces { - /// Latest nonce that is known to the client. +pub struct SourceClientNonces { + /// New nonces range known to the client. `New` here means all nonces generated after + /// `prev_latest_nonce` passed to the `SourceClient::nonces` method. + pub new_nonces: NoncesRange, + /// Latest nonce that is confirmed to the bridged client. This nonce only makes + /// sense in some races. In other races it is `None`. + pub confirmed_nonce: Option, +} + +/// Nonces on the race target client. +#[derive(Debug, Clone)] +pub struct TargetClientNonces { + /// Latest nonce that is known to the target client. pub latest_nonce: MessageNonce, /// Latest nonce that is confirmed to the bridged client. This nonce only makes /// sense in some races. In other races it is `None`. @@ -73,6 +96,8 @@ pub struct ClientNonces { pub trait SourceClient { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; + /// Type of nonces range returned by the source client. + type NoncesRange: NoncesRange; /// Additional proof parameters required to generate proof. type ProofParameters; @@ -80,14 +105,15 @@ pub trait SourceClient { async fn nonces( &self, at_block: P::SourceHeaderId, - ) -> Result<(P::SourceHeaderId, ClientNonces), Self::Error>; + prev_latest_nonce: MessageNonce, + ) -> Result<(P::SourceHeaderId, SourceClientNonces), Self::Error>; /// Generate proof for delivering to the target client. async fn generate_proof( &self, at_block: P::SourceHeaderId, - nonces: RangeInclusive, + nonces: RangeInclusive, proof_parameters: Self::ProofParameters, - ) -> Result<(P::SourceHeaderId, RangeInclusive, P::Proof), Self::Error>; + ) -> Result<(P::SourceHeaderId, RangeInclusive, P::Proof), Self::Error>; } /// One of message lane clients, which is target client for the race. @@ -97,21 +123,21 @@ pub trait TargetClient { type Error: std::fmt::Debug + MaybeConnectionError; /// Return nonces that are known to the target client. - async fn nonces( - &self, - at_block: P::TargetHeaderId, - ) -> Result<(P::TargetHeaderId, ClientNonces), Self::Error>; + async fn nonces(&self, at_block: P::TargetHeaderId) + -> Result<(P::TargetHeaderId, TargetClientNonces), Self::Error>; /// Submit proof to the target client. async fn submit_proof( &self, generated_at_block: P::SourceHeaderId, - nonces: RangeInclusive, + nonces: RangeInclusive, proof: P::Proof, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; } /// Race strategy. -pub trait RaceStrategy { +pub trait RaceStrategy { + /// Type of nonces range expected from the source client. + type SourceNoncesRange: NoncesRange; /// Additional proof parameters required to generate proof. type ProofParameters; @@ -123,25 +149,25 @@ pub trait RaceStrategy { fn best_at_target(&self) -> MessageNonce; /// Called when nonces are updated at source node of the race. - fn source_nonces_updated(&mut self, at_block: SourceHeaderId, nonce: ClientNonces); + fn source_nonces_updated(&mut self, at_block: SourceHeaderId, nonces: SourceClientNonces); /// Called when nonces are updated at target node of the race. fn target_nonces_updated( &mut self, - nonces: ClientNonces, - race_state: &mut RaceState, + nonces: TargetClientNonces, + race_state: &mut RaceState, ); /// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated /// data) from source to target node. /// Additionally, parameters required to generate proof are returned. fn select_nonces_to_deliver( &mut self, - race_state: &RaceState, + race_state: &RaceState, ) -> Option<(RangeInclusive, Self::ProofParameters)>; } /// State of the race. #[derive(Debug)] -pub struct RaceState { +pub struct RaceState { /// Source state, if known. pub source_state: Option>, /// Target state, if known. @@ -162,8 +188,8 @@ pub async fn run>( mut strategy: impl RaceStrategy< P::SourceHeaderId, P::TargetHeaderId, - P::MessageNonce, P::Proof, + SourceNoncesRange = SC::NoncesRange, ProofParameters = SC::ProofParameters, >, ) -> Result<(), FailedClient> { @@ -337,7 +363,7 @@ pub async fn run>( .expect("source_nonces_required is only true when source_state is Some; qed") .best_self .clone(); - source_nonces.set(race_source.nonces(at_block).fuse()); + source_nonces.set(race_source.nonces(at_block, strategy.best_at_source()).fuse()); } else { source_client_is_online = true; } @@ -375,9 +401,7 @@ pub async fn run>( } } -impl Default - for RaceState -{ +impl Default for RaceState { fn default() -> Self { RaceState { source_state: None, @@ -392,7 +416,7 @@ impl Default fn print_race_progress(prev_time: Instant, strategy: &S) -> Instant where P: MessageRace, - S: RaceStrategy, + S: RaceStrategy, { let now_time = Instant::now(); @@ -414,13 +438,13 @@ where now_time } -fn select_nonces_to_deliver( - race_state: &RaceState, +fn select_nonces_to_deliver( + race_state: &RaceState, strategy: &mut Strategy, ) -> Option<(SourceHeaderId, RangeInclusive, Strategy::ProofParameters)> where SourceHeaderId: Clone, - Strategy: RaceStrategy, + Strategy: RaceStrategy, { race_state.target_state.as_ref().and_then(|target_state| { strategy @@ -442,8 +466,8 @@ mod tests { const BEST_AT_TARGET: u64 = 8; // target node only knows about source' BEST_AT_TARGET block - // source node has BEST_AT_SOURCE > BEST_AT_SOURCE block - let mut race_state = RaceState::<_, _, _, ()> { + // source node has BEST_AT_SOURCE > BEST_AT_TARGET block + let mut race_state = RaceState::<_, _, ()> { source_state: Some(ClientState { best_self: HeaderId(BEST_AT_SOURCE, BEST_AT_SOURCE), best_peer: HeaderId(0, 0), @@ -457,16 +481,16 @@ mod tests { }; // we have some nonces to deliver and they're generated at GENERATED_AT < BEST_AT_SOURCE - let mut strategy = BasicStrategy::new(100); + let mut strategy = BasicStrategy::new(); strategy.source_nonces_updated( HeaderId(GENERATED_AT, GENERATED_AT), - ClientNonces { - latest_nonce: 10u64, + SourceClientNonces { + new_nonces: 0..=10, confirmed_nonce: None, }, ); strategy.target_nonces_updated( - ClientNonces { + TargetClientNonces { latest_nonce: 5u64, confirmed_nonce: None, }, diff --git a/relays/messages-relay/src/message_race_receiving.rs b/relays/messages-relay/src/message_race_receiving.rs index bf9975717c319..852b2893f1a3b 100644 --- a/relays/messages-relay/src/message_race_receiving.rs +++ b/relays/messages-relay/src/message_race_receiving.rs @@ -18,11 +18,14 @@ use crate::message_lane_loop::{ SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, TargetClientState, }; -use crate::message_race_loop::{ClientNonces, MessageRace, SourceClient, TargetClient}; +use crate::message_race_loop::{ + MessageRace, NoncesRange, SourceClient, SourceClientNonces, TargetClient, TargetClientNonces, +}; use crate::message_race_strategy::BasicStrategy; use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; +use bp_message_lane::MessageNonce; use futures::stream::FusedStream; use relay_utils::FailedClient; use std::{marker::PhantomData, ops::RangeInclusive, time::Duration}; @@ -33,7 +36,7 @@ type ReceivingConfirmationsBasicStrategy

= BasicStrategy<

::TargetHeaderHash,

::SourceHeaderNumber,

::SourceHeaderHash, -

::MessageNonce, + RangeInclusive,

::MessagesReceivingProof, >; @@ -60,7 +63,7 @@ pub async fn run( }, source_state_updates, stall_timeout, - ReceivingConfirmationsBasicStrategy::

::new(std::u32::MAX.into()), + ReceivingConfirmationsBasicStrategy::

::new(), ) .await } @@ -72,7 +75,7 @@ impl MessageRace for ReceivingConfirmationsRace

{ type SourceHeaderId = TargetHeaderIdOf

; type TargetHeaderId = SourceHeaderIdOf

; - type MessageNonce = P::MessageNonce; + type MessageNonce = MessageNonce; type Proof = P::MessagesReceivingProof; fn source_name() -> String { @@ -98,20 +101,22 @@ where C: MessageLaneTargetClient

, { type Error = C::Error; + type NoncesRange = RangeInclusive; type ProofParameters = (); async fn nonces( &self, at_block: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, ClientNonces), Self::Error> { + prev_latest_nonce: MessageNonce, + ) -> Result<(TargetHeaderIdOf

, SourceClientNonces), Self::Error> { let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?; if let Some(metrics_msg) = self.metrics_msg.as_ref() { metrics_msg.update_target_latest_received_nonce::

(latest_received_nonce); } Ok(( at_block, - ClientNonces { - latest_nonce: latest_received_nonce, + SourceClientNonces { + new_nonces: prev_latest_nonce + 1..=latest_received_nonce, confirmed_nonce: None, }, )) @@ -121,12 +126,12 @@ where async fn generate_proof( &self, at_block: TargetHeaderIdOf

, - nonces: RangeInclusive, + nonces: RangeInclusive, _proof_parameters: Self::ProofParameters, ) -> Result< ( TargetHeaderIdOf

, - RangeInclusive, + RangeInclusive, P::MessagesReceivingProof, ), Self::Error, @@ -156,14 +161,14 @@ where async fn nonces( &self, at_block: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, ClientNonces), Self::Error> { + ) -> Result<(SourceHeaderIdOf

, TargetClientNonces), Self::Error> { let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?; if let Some(metrics_msg) = self.metrics_msg.as_ref() { metrics_msg.update_source_latest_confirmed_nonce::

(latest_confirmed_nonce); } Ok(( at_block, - ClientNonces { + TargetClientNonces { latest_nonce: latest_confirmed_nonce, confirmed_nonce: None, }, @@ -173,12 +178,51 @@ where async fn submit_proof( &self, generated_at_block: TargetHeaderIdOf

, - nonces: RangeInclusive, + nonces: RangeInclusive, proof: P::MessagesReceivingProof, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { self.client .submit_messages_receiving_proof(generated_at_block, proof) .await?; Ok(nonces) } } + +impl NoncesRange for RangeInclusive { + fn begin(&self) -> MessageNonce { + *RangeInclusive::::start(self) + } + + fn end(&self) -> MessageNonce { + *RangeInclusive::::end(self) + } + + fn greater_than(self, nonce: MessageNonce) -> Option { + let next_nonce = nonce + 1; + let end = *self.end(); + if next_nonce > end { + None + } else { + Some(std::cmp::max(self.begin(), next_nonce)..=end) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn range_inclusive_works_as_nonces_range() { + let range = 20..=30; + + assert_eq!(NoncesRange::begin(&range), 20); + assert_eq!(NoncesRange::end(&range), 30); + assert_eq!(range.clone().greater_than(10), Some(20..=30)); + assert_eq!(range.clone().greater_than(19), Some(20..=30)); + assert_eq!(range.clone().greater_than(20), Some(21..=30)); + assert_eq!(range.clone().greater_than(25), Some(26..=30)); + assert_eq!(range.clone().greater_than(29), Some(30..=30)); + assert_eq!(range.greater_than(30), None); + } +} diff --git a/relays/messages-relay/src/message_race_strategy.rs b/relays/messages-relay/src/message_race_strategy.rs index 4aabf6c200afd..83b139f0f3e2e 100644 --- a/relays/messages-relay/src/message_race_strategy.rs +++ b/relays/messages-relay/src/message_race_strategy.rs @@ -17,95 +17,172 @@ //! 2) new nonces may be proved to target node (i.e. they have appeared at the //! block, which is known to the target node). -use crate::message_race_loop::{ClientNonces, RaceState, RaceStrategy}; +use crate::message_race_loop::{NoncesRange, RaceState, RaceStrategy, SourceClientNonces, TargetClientNonces}; -use num_traits::{One, Zero}; +use bp_message_lane::MessageNonce; use relay_utils::HeaderId; use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive}; /// Nonces delivery strategy. #[derive(Debug)] -pub struct BasicStrategy { +pub struct BasicStrategy< + SourceHeaderNumber, + SourceHeaderHash, + TargetHeaderNumber, + TargetHeaderHash, + SourceNoncesRange, + Proof, +> { /// All queued nonces. - source_queue: VecDeque<(HeaderId, Nonce)>, + source_queue: VecDeque<(HeaderId, SourceNoncesRange)>, /// Best nonce known to target node. - target_nonce: Nonce, - /// Max nonces to relay in single transaction. - max_nonces_to_relay_in_single_tx: Nonce, + target_nonce: MessageNonce, /// Unused generic types dump. _phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>, } -impl - BasicStrategy +impl + BasicStrategy +where + SourceHeaderHash: Clone, + SourceHeaderNumber: Clone + Ord, + SourceNoncesRange: NoncesRange, { /// Create new delivery strategy. - pub fn new(max_nonces_to_relay_in_single_tx: Nonce) -> Self { + pub fn new() -> Self { BasicStrategy { source_queue: VecDeque::new(), target_nonce: Default::default(), - max_nonces_to_relay_in_single_tx, _phantom: Default::default(), } } + + /// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated + /// data) from source to target node. + /// + /// The `selector` function receives range of nonces and should return `None` if the whole + /// range needs to be delivered. If there are some nonces in the range that can't be delivered + /// right now, it should return `Some` with 'undeliverable' nonces. Please keep in mind that + /// this should be the sub-range that the passed range ends with, because nonces are always + /// delivered in-order. Otherwise the function will panic. + pub fn select_nonces_to_deliver_with_selector( + &mut self, + race_state: &RaceState< + HeaderId, + HeaderId, + Proof, + >, + mut selector: impl FnMut(SourceNoncesRange) -> Option, + ) -> Option> { + // if we have already selected nonces that we want to submit, do nothing + if race_state.nonces_to_submit.is_some() { + return None; + } + + // if we already submitted some nonces, do nothing + if race_state.nonces_submitted.is_some() { + return None; + } + + // 1) we want to deliver all nonces, starting from `target_nonce + 1` + // 2) we can't deliver new nonce until header, that has emitted this nonce, is finalized + // by target client + // 3) selector is used for more complicated logic + let best_header_at_target = &race_state.target_state.as_ref()?.best_peer; + let mut nonces_end = None; + + while let Some((queued_at, queued_range)) = self.source_queue.pop_front() { + // select (sub) range to deliver + let queued_range_begin = queued_range.begin(); + let queued_range_end = queued_range.end(); + let range_to_requeue = if queued_at.0 > best_header_at_target.0 { + // if header that has queued the range is not yet finalized at bridged chain, + // we can't prove anything + Some(queued_range) + } else { + // selector returns `Some(range)` if this `range` needs to be requeued + selector(queued_range) + }; + + // requeue (sub) range and update range to deliver + match range_to_requeue { + Some(range_to_requeue) => { + assert!( + range_to_requeue.begin() <= range_to_requeue.end() + && range_to_requeue.begin() >= queued_range_begin + && range_to_requeue.end() == queued_range_end, + "Incorrect implementation of internal `selector` function. Expected original\ + range {:?} to end with returned range {:?}", + queued_range_begin..=queued_range_end, + range_to_requeue, + ); + + if range_to_requeue.begin() != queued_range_begin { + nonces_end = Some(range_to_requeue.begin() - 1); + } + self.source_queue.push_front((queued_at, range_to_requeue)); + break; + } + None => { + nonces_end = Some(queued_range_end); + } + } + } + + nonces_end.map(|nonces_end| RangeInclusive::new(self.target_nonce + 1, nonces_end)) + } } -impl - RaceStrategy< - HeaderId, - HeaderId, - Nonce, - Proof, - > for BasicStrategy +impl + RaceStrategy, HeaderId, Proof> + for BasicStrategy where SourceHeaderHash: Clone, SourceHeaderNumber: Clone + Ord, - Nonce: Clone + Copy + From + Ord + std::ops::Add + One + Zero, + SourceNoncesRange: NoncesRange, { + type SourceNoncesRange = SourceNoncesRange; type ProofParameters = (); fn is_empty(&self) -> bool { self.source_queue.is_empty() } - fn best_at_source(&self) -> Nonce { - self.source_queue - .back() - .map(|(_, nonce)| *nonce) - .unwrap_or_else(Zero::zero) + fn best_at_source(&self) -> MessageNonce { + std::cmp::max( + self.source_queue + .back() + .map(|(_, range)| range.end()) + .unwrap_or(self.target_nonce), + self.target_nonce, + ) } - fn best_at_target(&self) -> Nonce { + fn best_at_target(&self) -> MessageNonce { self.target_nonce } fn source_nonces_updated( &mut self, at_block: HeaderId, - nonces: ClientNonces, + nonces: SourceClientNonces, ) { - let nonce = nonces.latest_nonce; - - if nonce <= self.target_nonce { - return; - } - - match self.source_queue.back() { - Some((_, prev_nonce)) if *prev_nonce < nonce => (), - Some(_) => return, - None => (), - } - - self.source_queue.push_back((at_block, nonce)) + let prev_best_at_source = self.best_at_source(); + self.source_queue.extend( + nonces + .new_nonces + .greater_than(prev_best_at_source) + .into_iter() + .map(move |range| (at_block.clone(), range)), + ) } fn target_nonces_updated( &mut self, - nonces: ClientNonces, + nonces: TargetClientNonces, race_state: &mut RaceState< HeaderId, HeaderId, - Nonce, Proof, >, ) { @@ -115,12 +192,15 @@ where return; } - while let Some(true) = self - .source_queue - .front() - .map(|(_, source_nonce)| *source_nonce <= nonce) - { - self.source_queue.pop_front(); + while let Some(true) = self.source_queue.front().map(|(_, range)| range.begin() <= nonce) { + let maybe_subrange = self + .source_queue + .pop_front() + .and_then(|(at_block, range)| range.greater_than(nonce).map(|subrange| (at_block, subrange))); + if let Some((at_block, subrange)) = maybe_subrange { + self.source_queue.push_front((at_block, subrange)); + break; + } } let need_to_select_new_nonces = race_state @@ -149,60 +229,11 @@ where race_state: &RaceState< HeaderId, HeaderId, - Nonce, Proof, >, - ) -> Option<(RangeInclusive, Self::ProofParameters)> { - // if we have already selected nonces that we want to submit, do nothing - if race_state.nonces_to_submit.is_some() { - return None; - } - - // if we already submitted some nonces, do nothing - if race_state.nonces_submitted.is_some() { - return None; - } - - // 1) we want to deliver all nonces, starting from `target_nonce + 1` - // 2) we want to deliver at most `self.max_nonces_to_relay_in_single_tx` nonces in this batch - // 3) we can't deliver new nonce until header, that has emitted this nonce, is finalized - // by target client - let nonces_begin = self.target_nonce + 1.into(); - let best_header_at_target = &race_state.target_state.as_ref()?.best_peer; - let mut nonces_end = None; - let mut i = Zero::zero(); - - // https://github.com/paritytech/parity-bridges-common/issues/433 - // TODO: instead of limiting number of messages by number, provide custom limit callback here. - // In delivery race it'll be weight-based callback. In receiving race it'll be unlimited callback. - - while i < self.max_nonces_to_relay_in_single_tx { - let nonce = nonces_begin + i; - - // if queue is empty, we don't need to prove anything - let (first_queued_at, first_queued_nonce) = match self.source_queue.front() { - Some((first_queued_at, first_queued_nonce)) => ((*first_queued_at).clone(), *first_queued_nonce), - None => break, - }; - - // if header that has queued the message is not yet finalized at bridged chain, - // we can't prove anything - if first_queued_at.0 > best_header_at_target.0 { - break; - } - - // ok, we may deliver this nonce - nonces_end = Some(nonce); - - // probably remove it from the queue? - if nonce == first_queued_nonce { - self.source_queue.pop_front(); - } - - i = i + One::one(); - } - - nonces_end.map(|nonces_end| (RangeInclusive::new(nonces_begin, nonces_end), ())) + ) -> Option<(RangeInclusive, Self::ProofParameters)> { + self.select_nonces_to_deliver_with_selector(race_state, |_| None) + .map(|range| (range, ())) } } @@ -211,21 +242,30 @@ mod tests { use super::*; use crate::message_lane::MessageLane; use crate::message_lane_loop::{ - tests::{header_id, TestMessageLane, TestMessageNonce, TestMessagesProof}, + tests::{header_id, TestMessageLane, TestMessagesProof}, ClientState, }; + type SourceNoncesRange = RangeInclusive; + type BasicStrategy

= super::BasicStrategy<

::SourceHeaderNumber,

::SourceHeaderHash,

::TargetHeaderNumber,

::TargetHeaderHash, -

::MessageNonce, + SourceNoncesRange,

::MessagesProof, >; - fn nonces(latest_nonce: TestMessageNonce) -> ClientNonces { - ClientNonces { + fn source_nonces(new_nonces: SourceNoncesRange) -> SourceClientNonces { + SourceClientNonces { + new_nonces, + confirmed_nonce: None, + } + } + + fn target_nonces(latest_nonce: MessageNonce) -> TargetClientNonces { + TargetClientNonces { latest_nonce, confirmed_nonce: None, } @@ -233,105 +273,116 @@ mod tests { #[test] fn strategy_is_empty_works() { - let mut strategy = BasicStrategy::::new(4); + let mut strategy = BasicStrategy::::new(); assert_eq!(strategy.is_empty(), true); - strategy.source_nonces_updated(header_id(1), nonces(1)); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=1)); assert_eq!(strategy.is_empty(), false); } + #[test] + fn best_at_source_is_never_lower_than_target_nonce() { + let mut strategy = BasicStrategy::::new(); + assert_eq!(strategy.best_at_source(), 0); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=5)); + assert_eq!(strategy.best_at_source(), 5); + strategy.target_nonces_updated(target_nonces(10), &mut Default::default()); + assert_eq!(strategy.source_queue, vec![]); + assert_eq!(strategy.best_at_source(), 10); + } + #[test] fn source_nonce_is_never_lower_than_known_target_nonce() { - let mut strategy = BasicStrategy::::new(4); - strategy.target_nonces_updated(nonces(10), &mut Default::default()); - strategy.source_nonces_updated(header_id(1), nonces(5)); + let mut strategy = BasicStrategy::::new(); + strategy.target_nonces_updated(target_nonces(10), &mut Default::default()); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=5)); assert_eq!(strategy.source_queue, vec![]); } #[test] fn source_nonce_is_never_lower_than_latest_known_source_nonce() { - let mut strategy = BasicStrategy::::new(4); - strategy.source_nonces_updated(header_id(1), nonces(5)); - strategy.source_nonces_updated(header_id(2), nonces(3)); - strategy.source_nonces_updated(header_id(2), nonces(5)); - assert_eq!(strategy.source_queue, vec![(header_id(1), 5)]); + let mut strategy = BasicStrategy::::new(); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=5)); + strategy.source_nonces_updated(header_id(2), source_nonces(1..=3)); + strategy.source_nonces_updated(header_id(2), source_nonces(1..=5)); + assert_eq!(strategy.source_queue, vec![(header_id(1), 1..=5)]); } #[test] fn target_nonce_is_never_lower_than_latest_known_target_nonce() { - let mut strategy = BasicStrategy::::new(4); - strategy.target_nonces_updated(nonces(10), &mut Default::default()); - strategy.target_nonces_updated(nonces(5), &mut Default::default()); + let mut strategy = BasicStrategy::::new(); + strategy.target_nonces_updated(target_nonces(10), &mut Default::default()); + strategy.target_nonces_updated(target_nonces(5), &mut Default::default()); assert_eq!(strategy.target_nonce, 10); } #[test] fn updated_target_nonce_removes_queued_entries() { - let mut strategy = BasicStrategy::::new(4); - strategy.source_nonces_updated(header_id(1), nonces(5)); - strategy.source_nonces_updated(header_id(2), nonces(10)); - strategy.source_nonces_updated(header_id(3), nonces(15)); - strategy.source_nonces_updated(header_id(4), nonces(20)); - strategy.target_nonces_updated(nonces(15), &mut Default::default()); - assert_eq!(strategy.source_queue, vec![(header_id(4), 20)]); + let mut strategy = BasicStrategy::::new(); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=5)); + strategy.source_nonces_updated(header_id(2), source_nonces(6..=10)); + strategy.source_nonces_updated(header_id(3), source_nonces(11..=15)); + strategy.source_nonces_updated(header_id(4), source_nonces(16..=20)); + strategy.target_nonces_updated(target_nonces(15), &mut Default::default()); + assert_eq!(strategy.source_queue, vec![(header_id(4), 16..=20)]); + strategy.target_nonces_updated(target_nonces(17), &mut Default::default()); + assert_eq!(strategy.source_queue, vec![(header_id(4), 18..=20)]); } #[test] fn selected_nonces_are_dropped_on_target_nonce_update() { let mut state = RaceState::default(); - let mut strategy = BasicStrategy::::new(4); + let mut strategy = BasicStrategy::::new(); state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None))); - strategy.target_nonces_updated(nonces(7), &mut state); + strategy.target_nonces_updated(target_nonces(7), &mut state); assert!(state.nonces_to_submit.is_some()); - strategy.target_nonces_updated(nonces(10), &mut state); + strategy.target_nonces_updated(target_nonces(10), &mut state); assert!(state.nonces_to_submit.is_none()); } #[test] fn submitted_nonces_are_dropped_on_target_nonce_update() { let mut state = RaceState::default(); - let mut strategy = BasicStrategy::::new(4); + let mut strategy = BasicStrategy::::new(); state.nonces_submitted = Some(5..=10); - strategy.target_nonces_updated(nonces(7), &mut state); + strategy.target_nonces_updated(target_nonces(7), &mut state); assert!(state.nonces_submitted.is_some()); - strategy.target_nonces_updated(nonces(10), &mut state); + strategy.target_nonces_updated(target_nonces(10), &mut state); assert!(state.nonces_submitted.is_none()); } #[test] fn nothing_is_selected_if_something_is_already_selected() { let mut state = RaceState::default(); - let mut strategy = BasicStrategy::::new(4); + let mut strategy = BasicStrategy::::new(); state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None))); - strategy.source_nonces_updated(header_id(1), nonces(10)); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=10)); assert_eq!(strategy.select_nonces_to_deliver(&state), None); } #[test] fn nothing_is_selected_if_something_is_already_submitted() { let mut state = RaceState::default(); - let mut strategy = BasicStrategy::::new(4); + let mut strategy = BasicStrategy::::new(); state.nonces_submitted = Some(1..=10); - strategy.source_nonces_updated(header_id(1), nonces(10)); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=10)); assert_eq!(strategy.select_nonces_to_deliver(&state), None); } #[test] fn select_nonces_to_deliver_works() { - let mut state = RaceState::<_, _, TestMessageNonce, TestMessagesProof>::default(); - let mut strategy = BasicStrategy::::new(4); - strategy.source_nonces_updated(header_id(1), nonces(1)); - strategy.source_nonces_updated(header_id(2), nonces(2)); - strategy.source_nonces_updated(header_id(3), nonces(6)); - strategy.source_nonces_updated(header_id(5), nonces(8)); + let mut state = RaceState::<_, _, TestMessagesProof>::default(); + let mut strategy = BasicStrategy::::new(); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=1)); + strategy.source_nonces_updated(header_id(2), source_nonces(2..=2)); + strategy.source_nonces_updated(header_id(3), source_nonces(6..=6)); + strategy.source_nonces_updated(header_id(5), source_nonces(8..=8)); state.target_state = Some(ClientState { best_self: header_id(0), best_peer: header_id(4), }); - assert_eq!(strategy.select_nonces_to_deliver(&state), Some((1..=4, ()))); - strategy.target_nonces_updated(nonces(4), &mut state); - assert_eq!(strategy.select_nonces_to_deliver(&state), Some((5..=6, ()))); - strategy.target_nonces_updated(nonces(6), &mut state); + assert_eq!(strategy.select_nonces_to_deliver(&state), Some((1..=6, ()))); + strategy.target_nonces_updated(target_nonces(6), &mut state); assert_eq!(strategy.select_nonces_to_deliver(&state), None); state.target_state = Some(ClientState { @@ -339,7 +390,57 @@ mod tests { best_peer: header_id(5), }); assert_eq!(strategy.select_nonces_to_deliver(&state), Some((7..=8, ()))); - strategy.target_nonces_updated(nonces(8), &mut state); + strategy.target_nonces_updated(target_nonces(8), &mut state); assert_eq!(strategy.select_nonces_to_deliver(&state), None); } + + #[test] + fn select_nonces_to_deliver_able_to_split_ranges_with_selector() { + let mut state = RaceState::<_, _, TestMessagesProof>::default(); + let mut strategy = BasicStrategy::::new(); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=100)); + + state.target_state = Some(ClientState { + best_self: header_id(0), + best_peer: header_id(1), + }); + assert_eq!( + strategy.select_nonces_to_deliver_with_selector(&state, |_| Some(50..=100)), + Some(1..=49), + ); + } + + fn run_panic_test_for_incorrect_selector( + invalid_selector: impl Fn(SourceNoncesRange) -> Option, + ) { + let mut state = RaceState::<_, _, TestMessagesProof>::default(); + let mut strategy = BasicStrategy::::new(); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=100)); + strategy.target_nonces_updated(target_nonces(50), &mut state); + state.target_state = Some(ClientState { + best_self: header_id(0), + best_peer: header_id(1), + }); + + strategy.select_nonces_to_deliver_with_selector(&state, invalid_selector); + } + + #[test] + #[should_panic] + fn select_nonces_to_deliver_panics_if_selector_returns_empty_range() { + #[allow(clippy::reversed_empty_ranges)] + run_panic_test_for_incorrect_selector(|_| Some(2..=1)) + } + + #[test] + #[should_panic] + fn select_nonces_to_deliver_panics_if_selector_returns_range_that_starts_before_passed_range() { + run_panic_test_for_incorrect_selector(|range| Some(range.begin() - 1..=*range.end())) + } + + #[test] + #[should_panic] + fn select_nonces_to_deliver_panics_if_selector_returns_range_with_mismatched_end() { + run_panic_test_for_incorrect_selector(|range| Some(range.begin()..=*range.end() + 1)) + } } diff --git a/relays/messages-relay/src/metrics.rs b/relays/messages-relay/src/metrics.rs index 93f4259c106ea..462317028c15d 100644 --- a/relays/messages-relay/src/metrics.rs +++ b/relays/messages-relay/src/metrics.rs @@ -19,6 +19,7 @@ use crate::message_lane::MessageLane; use crate::message_lane_loop::{SourceClientState, TargetClientState}; +use bp_message_lane::MessageNonce; use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64}; /// Message lane relay metrics. @@ -77,30 +78,30 @@ impl MessageLaneLoopMetrics { } /// Update latest generated nonce at source. - pub fn update_source_latest_generated_nonce(&self, source_latest_generated_nonce: P::MessageNonce) { + pub fn update_source_latest_generated_nonce(&self, source_latest_generated_nonce: MessageNonce) { self.lane_state_nonces .with_label_values(&["source_latest_generated"]) - .set(source_latest_generated_nonce.into()); + .set(source_latest_generated_nonce); } /// Update latest confirmed nonce at source. - pub fn update_source_latest_confirmed_nonce(&self, source_latest_confirmed_nonce: P::MessageNonce) { + pub fn update_source_latest_confirmed_nonce(&self, source_latest_confirmed_nonce: MessageNonce) { self.lane_state_nonces .with_label_values(&["source_latest_confirmed"]) - .set(source_latest_confirmed_nonce.into()); + .set(source_latest_confirmed_nonce); } /// Update latest received nonce at target. - pub fn update_target_latest_received_nonce(&self, target_latest_generated_nonce: P::MessageNonce) { + pub fn update_target_latest_received_nonce(&self, target_latest_generated_nonce: MessageNonce) { self.lane_state_nonces .with_label_values(&["target_latest_received"]) - .set(target_latest_generated_nonce.into()); + .set(target_latest_generated_nonce); } /// Update latest confirmed nonce at target. - pub fn update_target_latest_confirmed_nonce(&self, target_latest_confirmed_nonce: P::MessageNonce) { + pub fn update_target_latest_confirmed_nonce(&self, target_latest_confirmed_nonce: MessageNonce) { self.lane_state_nonces .with_label_values(&["target_latest_confirmed"]) - .set(target_latest_confirmed_nonce.into()); + .set(target_latest_confirmed_nonce); } } diff --git a/relays/substrate-client/src/client.rs b/relays/substrate-client/src/client.rs index 1e0075f228c43..e52af57cdd59e 100644 --- a/relays/substrate-client/src/client.rs +++ b/relays/substrate-client/src/client.rs @@ -23,7 +23,6 @@ use crate::{ConnectionParams, Error, Result}; use bp_message_lane::{LaneId, MessageNonce}; use bp_runtime::InstanceId; use codec::Decode; -use frame_support::weights::Weight; use frame_system::AccountInfo; use jsonrpsee::common::DeserializeOwned; use jsonrpsee::raw::RawClient; @@ -215,8 +214,8 @@ impl Client { range: RangeInclusive, include_outbound_lane_state: bool, at_block: C::Hash, - ) -> Result<(Weight, StorageProof)> { - let (dispatch_weight, encoded_trie_nodes) = SubstrateMessageLane::::prove_messages( + ) -> Result { + let encoded_trie_nodes = SubstrateMessageLane::::prove_messages( &self.client, instance, lane, @@ -229,7 +228,7 @@ impl Client { .map_err(Error::Request)?; let decoded_trie_nodes: Vec> = Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?; - Ok((dispatch_weight, StorageProof::new(decoded_trie_nodes))) + Ok(StorageProof::new(decoded_trie_nodes)) } /// Returns proof-of-message(s) delivery. diff --git a/relays/substrate-client/src/error.rs b/relays/substrate-client/src/error.rs index 5f41af9aacc33..bafcb1a1fec73 100644 --- a/relays/substrate-client/src/error.rs +++ b/relays/substrate-client/src/error.rs @@ -36,6 +36,8 @@ pub enum Error { ResponseParseFailed(codec::Error), /// Account does not exist on the chain. AccountDoesNotExist, + /// Custom logic error. + Custom(String), } impl From for Error { @@ -69,6 +71,7 @@ impl ToString for Error { Self::Request(e) => e.to_string(), Self::ResponseParseFailed(e) => e.what().to_string(), Self::AccountDoesNotExist => "Account does not exist on the chain".into(), + Self::Custom(e) => e.clone(), } } } diff --git a/relays/substrate-client/src/rpc.rs b/relays/substrate-client/src/rpc.rs index 89ed36695abcc..9c76f593e99b8 100644 --- a/relays/substrate-client/src/rpc.rs +++ b/relays/substrate-client/src/rpc.rs @@ -25,7 +25,6 @@ use crate::chain::Chain; use bp_message_lane::{LaneId, MessageNonce}; use bp_runtime::InstanceId; -use frame_support::weights::Weight; use sp_core::{ storage::{StorageData, StorageKey}, Bytes, @@ -63,7 +62,7 @@ jsonrpsee::rpc_api! { end: MessageNonce, include_outbound_lane_state: bool, block: Option, - ) -> (Weight, Bytes); + ) -> Bytes; #[rpc(method = "messageLane_proveMessagesDelivery", positional_params)] fn prove_messages_delivery( diff --git a/relays/substrate/src/messages_source.rs b/relays/substrate/src/messages_source.rs index 46e990e101682..c648289fee147 100644 --- a/relays/substrate/src/messages_source.rs +++ b/relays/substrate/src/messages_source.rs @@ -25,7 +25,7 @@ use codec::{Decode, Encode}; use frame_support::weights::Weight; use messages_relay::{ message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, - message_lane_loop::{ClientState, SourceClient, SourceClientState}, + message_lane_loop::{ClientState, MessageProofParameters, MessageWeightsMap, SourceClient, SourceClientState}, }; use relay_substrate_client::{Chain, Client, Error as SubstrateError, HashOf, HeaderIdOf}; use relay_utils::HeaderId; @@ -95,7 +95,6 @@ where C::Index: DeserializeOwned, ::Number: Into, P: MessageLane< - MessageNonce = MessageNonce, MessagesProof = SubstrateMessagesProof, SourceHeaderNumber = ::Number, SourceHeaderHash = ::Hash, @@ -119,7 +118,7 @@ where async fn latest_generated_nonce( &self, id: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { + ) -> Result<(SourceHeaderIdOf

, MessageNonce), Self::Error> { let encoded_response = self .client .state_call( @@ -129,7 +128,7 @@ where Some(id.1), ) .await?; - let latest_generated_nonce: P::MessageNonce = + let latest_generated_nonce: MessageNonce = Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; Ok((id, latest_generated_nonce)) } @@ -137,7 +136,7 @@ where async fn latest_confirmed_received_nonce( &self, id: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { + ) -> Result<(SourceHeaderIdOf

, MessageNonce), Self::Error> { let encoded_response = self .client .state_call( @@ -147,29 +146,62 @@ where Some(id.1), ) .await?; - let latest_received_nonce: P::MessageNonce = + let latest_received_nonce: MessageNonce = Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; Ok((id, latest_received_nonce)) } + async fn generated_messages_weights( + &self, + id: SourceHeaderIdOf

, + nonces: RangeInclusive, + ) -> Result { + let encoded_response = self + .client + .state_call( + // TODO: https://github.com/paritytech/parity-bridges-common/issues/457 + "OutboundLaneApi_messages_dispatch_weight".into(), + Bytes((self.lane, nonces.start(), nonces.end()).encode()), + Some(id.1), + ) + .await?; + let weights: Vec<(MessageNonce, Weight)> = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + + let mut expected_nonce = *nonces.start(); + let mut weights_map = MessageWeightsMap::new(); + for (nonce, weight) in weights { + if nonce != expected_nonce { + return Err(SubstrateError::Custom(format!( + "Unexpected nonce in messages_dispatch_weight call result. Expected {}, got {}", + expected_nonce, nonce + ))); + } + + weights_map.insert(nonce, weight); + expected_nonce += 1; + } + Ok(weights_map) + } + async fn prove_messages( &self, id: SourceHeaderIdOf

, - nonces: RangeInclusive, - include_outbound_lane_state: bool, - ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error> { - let (weight, proof) = self + nonces: RangeInclusive, + proof_parameters: MessageProofParameters, + ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error> { + let proof = self .client .prove_messages( self.instance, self.lane, nonces.clone(), - include_outbound_lane_state, + proof_parameters.outbound_state_proof_required, id.1, ) .await?; let proof = (id.1, proof, self.lane, *nonces.start(), *nonces.end()); - Ok((id, nonces, (weight, proof))) + Ok((id, nonces, (proof_parameters.dispatch_weight, proof))) } async fn submit_messages_receiving_proof( diff --git a/relays/substrate/src/messages_target.rs b/relays/substrate/src/messages_target.rs index 541b4dc0f2d85..2569b5e2c9ae5 100644 --- a/relays/substrate/src/messages_target.rs +++ b/relays/substrate/src/messages_target.rs @@ -53,7 +53,7 @@ pub trait SubstrateTransactionMaker: Clone + Send + Sy async fn make_messages_delivery_transaction( &self, generated_at_header: SourceHeaderIdOf

, - nonces: RangeInclusive, + nonces: RangeInclusive, proof: P::MessagesProof, ) -> Result; } @@ -91,7 +91,6 @@ where C::Index: DeserializeOwned, ::Number: Into, P: MessageLane< - MessageNonce = MessageNonce, MessagesReceivingProof = (HashOf, StorageProof, LaneId), TargetHeaderNumber = ::Number, TargetHeaderHash = ::Hash, @@ -115,7 +114,7 @@ where async fn latest_received_nonce( &self, id: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { + ) -> Result<(TargetHeaderIdOf

, MessageNonce), Self::Error> { let encoded_response = self .client .state_call( @@ -125,7 +124,7 @@ where Some(id.1), ) .await?; - let latest_received_nonce: P::MessageNonce = + let latest_received_nonce: MessageNonce = Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; Ok((id, latest_received_nonce)) } @@ -133,7 +132,7 @@ where async fn latest_confirmed_received_nonce( &self, id: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { + ) -> Result<(TargetHeaderIdOf

, MessageNonce), Self::Error> { let encoded_response = self .client .state_call( @@ -143,7 +142,7 @@ where Some(id.1), ) .await?; - let latest_received_nonce: P::MessageNonce = + let latest_received_nonce: MessageNonce = Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; Ok((id, latest_received_nonce)) } @@ -163,9 +162,9 @@ where async fn submit_messages_proof( &self, generated_at_header: SourceHeaderIdOf

, - nonces: RangeInclusive, + nonces: RangeInclusive, proof: P::MessagesProof, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let tx = self .tx_maker .make_messages_delivery_transaction(generated_at_header, nonces.clone(), proof) diff --git a/relays/substrate/src/millau_messages_to_rialto.rs b/relays/substrate/src/millau_messages_to_rialto.rs index 47a5158b552ef..81017f6745fbd 100644 --- a/relays/substrate/src/millau_messages_to_rialto.rs +++ b/relays/substrate/src/millau_messages_to_rialto.rs @@ -52,7 +52,6 @@ impl MessageLane for MillauMessagesToRialto { const SOURCE_NAME: &'static str = "Millau"; const TARGET_NAME: &'static str = "Rialto"; - type MessageNonce = MessageNonce; type MessagesProof = FromMillauMessagesProof; type MessagesReceivingProof = FromRialtoMessagesReceivingProof; @@ -144,7 +143,12 @@ pub fn run( target_tick: rialto_tick, reconnect_delay, stall_timeout, - max_unconfirmed_nonces_at_target: bp_rialto::MAX_UNCONFIRMED_MESSAGES_AT_INBOUND_LANE, + delivery_params: messages_relay::message_lane_loop::MessageDeliveryParams { + max_unconfirmed_nonces_at_target: bp_rialto::MAX_UNCONFIRMED_MESSAGES_AT_INBOUND_LANE, + // TODO: subtract base weight of delivery from this when it'll be known + // https://github.com/paritytech/parity-bridges-common/issues/78 + max_messages_weight_in_single_batch: bp_rialto::MAXIMUM_EXTRINSIC_WEIGHT, + }, }, MillauSourceClient::new( millau_client.clone(),