Skip to content

Commit

Permalink
lp-gateway: Add queue for outbound messages
Browse files Browse the repository at this point in the history
  • Loading branch information
cdamian committed Jan 17, 2024
1 parent ca18a90 commit 3b7c6f7
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 129 deletions.
4 changes: 2 additions & 2 deletions libs/mocks/src/liquidity_pools.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use cfg_traits::liquidity_pools::Codec;
use parity_scale_codec::{Decode, Encode, Error, Input};
use parity_scale_codec::{Decode, Encode, Error, Input, MaxEncodedLen};
use scale_info::TypeInfo;

#[derive(Debug, Eq, PartialEq, Clone, Encode, Decode, TypeInfo)]
#[derive(Debug, Eq, PartialEq, Clone, Encode, Decode, TypeInfo, MaxEncodedLen)]
pub enum MessageMock {
First,
Second,
Expand Down
15 changes: 10 additions & 5 deletions libs/mocks/src/liquidity_pools_gateway_routers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ pub mod pallet {
register_call!(move |()| f());
}

pub fn mock_send(f: impl Fn(T::AccountId, MessageMock) -> DispatchResult + 'static) {
pub fn mock_send(
f: impl Fn(T::AccountId, MessageMock) -> DispatchResultWithPostInfo + 'static,
) {
register_call!(move |(sender, message)| f(sender, message));
}
}
Expand All @@ -41,7 +43,7 @@ pub mod pallet {
execute_call!(())
}

fn send(sender: Self::Sender, message: MessageMock) -> DispatchResult {
fn send(sender: Self::Sender, message: MessageMock) -> DispatchResultWithPostInfo {
execute_call!((sender, message))
}
}
Expand All @@ -68,7 +70,10 @@ impl<T: pallet::Config> RouterMock<T> {
pallet::Pallet::<T>::mock_init(f)
}

pub fn mock_send(&self, f: impl Fn(T::AccountId, MessageMock) -> DispatchResult + 'static) {
pub fn mock_send(
&self,
f: impl Fn(T::AccountId, MessageMock) -> DispatchResultWithPostInfo + 'static,
) {
pallet::Pallet::<T>::mock_send(f)
}
}
Expand All @@ -83,7 +88,7 @@ impl<T: pallet::Config> Router for RouterMock<T> {
pallet::Pallet::<T>::init()
}

fn send(&self, sender: Self::Sender, message: Self::Message) -> DispatchResult {
fn send(&self, sender: Self::Sender, message: Self::Message) -> DispatchResultWithPostInfo {
pallet::Pallet::<T>::send(sender, message)
}
}
Expand All @@ -105,5 +110,5 @@ trait MockedRouter {
fn init() -> DispatchResult;

/// Send the message to the router's destination.
fn send(sender: Self::Sender, message: Self::Message) -> DispatchResult;
fn send(sender: Self::Sender, message: Self::Message) -> DispatchResultWithPostInfo;
}
4 changes: 2 additions & 2 deletions libs/traits/src/liquidity_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

use frame_support::dispatch::DispatchResult;
use frame_support::dispatch::{DispatchResult, DispatchResultWithPostInfo};
use parity_scale_codec::Input;
use sp_std::vec::Vec;

Expand All @@ -34,7 +34,7 @@ pub trait Router {
fn init(&self) -> DispatchResult;

/// Send the message to the router's destination.
fn send(&self, sender: Self::Sender, message: Self::Message) -> DispatchResult;
fn send(&self, sender: Self::Sender, message: Self::Message) -> DispatchResultWithPostInfo;
}

/// The trait required for processing outbound messages.
Expand Down
21 changes: 10 additions & 11 deletions pallets/liquidity-pools-gateway/routers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ pub const GAS_TO_WEIGHT_MULTIPLIER: u64 = 25_000;

use cfg_traits::{ethereum::EthereumTransactor, liquidity_pools::Router};
use frame_support::{
dispatch::{DispatchError, DispatchResult, Weight},
dispatch::{
DispatchError, DispatchResult, DispatchResultWithPostInfo, PostDispatchInfo, Weight,
},
ensure,
traits::OriginTrait,
};
Expand Down Expand Up @@ -113,7 +115,7 @@ where
}
}

fn send(&self, sender: Self::Sender, message: Self::Message) -> DispatchResult {
fn send(&self, sender: Self::Sender, message: Self::Message) -> DispatchResultWithPostInfo {
match self {
DomainRouter::EthereumXCM(r) => r.do_send(sender, message),
DomainRouter::AxelarEVM(r) => r.do_send(sender, message),
Expand Down Expand Up @@ -159,12 +161,9 @@ where
/// pallet, this EVM address will be converted back into a substrate account
/// which will be charged for the transaction. This converted substrate
/// account is not the same as the original account.
pub fn do_send(&self, sender: T::AccountId, msg: Vec<u8>) -> DispatchResult {
pub fn do_send(&self, sender: T::AccountId, msg: Vec<u8>) -> DispatchResultWithPostInfo {
let sender_evm_address = H160::from_slice(&sender.as_ref()[0..20]);

// TODO(cdamian): This returns a `DispatchResultWithPostInfo`. Should we
// propagate that to another layer that will eventually charge for the
// weight in the PostDispatchInfo?
<pallet_ethereum_transaction::Pallet<T> as EthereumTransactor>::call(
sender_evm_address,
self.evm_domain.target_contract_address,
Expand All @@ -173,9 +172,6 @@ where
self.evm_domain.fee_values.gas_price,
self.evm_domain.fee_values.gas_limit,
)
.map_err(|e| e.error)?;

Ok(())
}
}

Expand Down Expand Up @@ -231,7 +227,7 @@ where

/// Encodes the message to the required format and executes the
/// call via the XCM transactor pallet.
pub fn do_send(&self, sender: T::AccountId, msg: Vec<u8>) -> DispatchResult {
pub fn do_send(&self, sender: T::AccountId, msg: Vec<u8>) -> DispatchResultWithPostInfo {
let ethereum_xcm_call = get_encoded_ethereum_xcm_call::<T>(self.xcm_domain.clone(), msg)
.map_err(|_| DispatchError::Other("encoded ethereum xcm call retrieval"))?;

Expand All @@ -257,7 +253,10 @@ where
true,
)?;

Ok(())
Ok(PostDispatchInfo {
actual_weight: Some(self.xcm_domain.overall_weight),
pays_fee: Default::default(),
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// GNU General Public License for more details.
use cfg_traits::liquidity_pools::Codec;
use ethabi::{Contract, Function, Param, ParamType, Token};
use frame_support::dispatch::{DispatchError, DispatchResult};
use frame_support::dispatch::{DispatchError, DispatchResult, DispatchResultWithPostInfo};
use frame_system::pallet_prelude::OriginFor;
use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
use scale_info::{
Expand Down Expand Up @@ -61,7 +61,7 @@ where

/// Encodes the message to the required format,
/// then executes the EVM call using the generic EVM router.
pub fn do_send(&self, sender: AccountIdOf<T>, msg: MessageOf<T>) -> DispatchResult {
pub fn do_send(&self, sender: AccountIdOf<T>, msg: MessageOf<T>) -> DispatchResultWithPostInfo {
let eth_msg = get_axelar_encoded_msg(
msg.serialize(),
self.evm_chain.clone().into_inner(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// GNU General Public License for more details.

use cfg_traits::liquidity_pools::Codec;
use frame_support::dispatch::DispatchResult;
use frame_support::dispatch::{DispatchResult, DispatchResultWithPostInfo};
use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
use scale_info::TypeInfo;
use sp_core::{bounded::BoundedVec, ConstU32, H160};
Expand Down Expand Up @@ -53,7 +53,7 @@ where

/// Encodes the message to the required format,
/// then executes the EVM call using the generic XCM router.
pub fn do_send(&self, sender: AccountIdOf<T>, msg: MessageOf<T>) -> DispatchResult {
pub fn do_send(&self, sender: AccountIdOf<T>, msg: MessageOf<T>) -> DispatchResultWithPostInfo {
let contract_call = get_axelar_encoded_msg(
msg.serialize(),
self.axelar_target_chain.clone().into_inner(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
// GNU General Public License for more details.
use cfg_traits::liquidity_pools::Codec;
use ethabi::{Bytes, Contract};
use frame_support::{dispatch::DispatchResult, sp_runtime::DispatchError};
use frame_support::{
dispatch::{DispatchResult, DispatchResultWithPostInfo},
sp_runtime::DispatchError,
};
use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
use scale_info::TypeInfo;
use sp_std::{collections::btree_map::BTreeMap, marker::PhantomData, vec, vec::Vec};
Expand Down Expand Up @@ -43,7 +46,7 @@ where

/// Encodes the message to the required format and executes the
/// call via the XCM router.
pub fn do_send(&self, sender: AccountIdOf<T>, msg: MessageOf<T>) -> DispatchResult {
pub fn do_send(&self, sender: AccountIdOf<T>, msg: MessageOf<T>) -> DispatchResultWithPostInfo {
let contract_call = get_encoded_contract_call(msg.serialize())
.map_err(|_| DispatchError::Other("encoded contract call retrieval"))?;

Expand Down
4 changes: 2 additions & 2 deletions pallets/liquidity-pools-gateway/routers/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ mod evm_router {
let res = router.do_send(test_data.sender, test_data.msg);

assert_eq!(
res.err().unwrap(),
res.err().unwrap().error,
pallet_evm::Error::<Runtime>::BalanceLow.into()
);
});
Expand Down Expand Up @@ -483,7 +483,7 @@ mod axelar_evm {
let res = domain_router.send(test_data.sender, test_data.msg);

assert_eq!(
res.err().unwrap(),
res.err().unwrap().error,
pallet_evm::Error::<Runtime>::BalanceLow.into()
);
});
Expand Down
113 changes: 105 additions & 8 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use cfg_traits::{
};
use cfg_types::domain_address::{Domain, DomainAddress};
use frame_support::{dispatch::DispatchResult, pallet_prelude::*, PalletError};
use frame_system::pallet_prelude::OriginFor;
use frame_system::pallet_prelude::{BlockNumberFor, OriginFor};
pub use pallet::*;
use parity_scale_codec::{EncodeLike, FullCodec};
use sp_std::{convert::TryInto, vec::Vec};
Expand Down Expand Up @@ -56,6 +56,7 @@ impl<T: Config> From<RelayerMessageDecodingError> for Error<T> {
pub mod pallet {
const BYTES_U32: usize = 4;
const BYTES_ACCOUNT_20: usize = 20;
const DEFAULT_WEIGHT_REF_TIME: u64 = 100_000_000;

use super::*;
use crate::RelayerMessageDecodingError::{
Expand Down Expand Up @@ -94,7 +95,7 @@ pub mod pallet {
///
/// NOTE - this `Codec` trait is the Centrifuge trait for liquidity
/// pools' messages.
type Message: Codec + Clone + Debug + PartialEq;
type Message: Codec + Clone + Debug + PartialEq + MaxEncodedLen + TypeInfo + FullCodec;

/// The message router type that is stored for each domain.
type Router: DomainRouter<Sender = Self::AccountId, Message = Self::Message>
Expand Down Expand Up @@ -148,6 +149,23 @@ pub mod pallet {
domain: Domain,
message: T::Message,
},

/// Router not found for the provided domain.
RouterNotFound { domain: Domain },

/// Router execution failure.
RouterExecutionFailure {
sender: T::AccountId,
domain: Domain,
message: T::Message,
},

/// Router execution success.
RouterExecutionSuccess {
sender: T::AccountId,
domain: Domain,
message: T::Message,
},
}

/// Storage for domain routers.
Expand All @@ -174,6 +192,19 @@ pub mod pallet {
pub(crate) type RelayerList<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, Domain, Blake2_128Concat, DomainAddress, ()>;

/// Storage for outbound messages that will be processed `on_idle`.
#[pallet::storage]
pub(crate) type OutboundMessageQueue<T: Config> = StorageNMap<
_,
(
NMapKey<Blake2_128Concat, Domain>,
NMapKey<Blake2_128Concat, T::AccountId>,
NMapKey<Blake2_128Concat, T::Message>,
),
(),
ValueQuery,
>;

#[pallet::error]
pub enum Error<T> {
/// Router initialization failed.
Expand Down Expand Up @@ -214,6 +245,13 @@ pub mod pallet {
RelayerMessageDecodingFailed { reason: RelayerMessageDecodingError },
}

#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight {
Self::service_outbound_message_queue(max_weight)
}
}

#[pallet::call]
impl<T: Config> Pallet<T> {
/// Set a domain's router,
Expand Down Expand Up @@ -462,6 +500,65 @@ pub mod pallet {

Ok((address, incoming_msg))
}

fn service_outbound_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();

let mut processed_entries = Vec::new();

for ((domain, sender, message), _) in OutboundMessageQueue::<T>::iter() {
let router = match DomainRouters::<T>::get(domain.clone()) {
Some(router) => router,
None => {
Self::deposit_event(Event::RouterNotFound {
domain: domain.clone(),
});

continue;
}
};

let res = match router.send(sender.clone(), message.clone()) {
Ok(r) => {
Self::deposit_event(Event::RouterExecutionSuccess {
sender: sender.clone(),
domain: domain.clone(),
message: message.clone(),
});

processed_entries.push((domain.clone(), sender.clone(), message.clone()));

r
}
Err(e) => {
Self::deposit_event(Event::RouterExecutionFailure {
sender: sender.clone(),
domain: domain.clone(),
message: message.clone(),
});

e.post_info
}
};

let weight = res
.actual_weight
.or(Some(Weight::from_parts(DEFAULT_WEIGHT_REF_TIME, 0)))
.unwrap();

weight_used = weight_used.saturating_add(weight);

if weight_used.all_gte(max_weight) {
break;
}
}

for entry in processed_entries {
OutboundMessageQueue::<T>::remove(entry);
}

weight_used
}
}

/// This pallet will be the `OutboundQueue` used by other pallets to send
Expand All @@ -476,7 +573,7 @@ pub mod pallet {
type Sender = T::AccountId;

fn submit(
sender: Self::Sender,
_sender: Self::Sender,
destination: Self::Destination,
message: Self::Message,
) -> DispatchResult {
Expand All @@ -485,13 +582,13 @@ pub mod pallet {
Error::<T>::DomainNotSupported
);

let router =
DomainRouters::<T>::get(destination.clone()).ok_or(Error::<T>::RouterNotFound)?;

router.send(T::Sender::get(), message.clone())?;
OutboundMessageQueue::<T>::insert(
(destination.clone(), T::Sender::get(), message.clone()),
(),
);

Self::deposit_event(Event::OutboundMessageSubmitted {
sender,
sender: T::Sender::get(),
domain: destination,
message,
});
Expand Down
Loading

0 comments on commit 3b7c6f7

Please sign in to comment.