diff --git a/bridges/relays/exchange/Cargo.toml b/bridges/relays/exchange/Cargo.toml deleted file mode 100644 index f08c40325ec71..0000000000000 --- a/bridges/relays/exchange/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "exchange-relay" -version = "0.1.0" -authors = ["Parity Technologies "] -edition = "2018" -license = "GPL-3.0-or-later WITH Classpath-exception-2.0" - -[dependencies] -anyhow = "1.0" -async-std = "1.6.5" -async-trait = "0.1.40" -backoff = "0.2" -futures = "0.3.5" -log = "0.4.11" -num-traits = "0.2" -parking_lot = "0.11.0" -relay-utils = { path = "../utils" } -thiserror = "1.0.26" diff --git a/bridges/relays/exchange/src/error.rs b/bridges/relays/exchange/src/error.rs deleted file mode 100644 index aa5c427a9efbd..0000000000000 --- a/bridges/relays/exchange/src/error.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Exchange-relay errors. - -use crate::exchange::{BlockHashOf, BlockNumberOf, TransactionHashOf}; - -use relay_utils::MaybeConnectionError; -use std::fmt::{Debug, Display}; -use thiserror::Error; - -/// Error type given pipeline. -pub type ErrorOf

= Error, BlockNumberOf

, TransactionHashOf

>; - -/// Exchange-relay error type. -#[derive(Error, Debug)] -pub enum Error { - /// Failed to check finality of the requested header on the target node. - #[error("Failed to check finality of header {0}/{1} on {2} node: {3:?}")] - Finality(HeaderNumber, Hash, &'static str, anyhow::Error), - /// Error retrieving block from the source node. - #[error("Error retrieving block {0} from {1} node: {2:?}")] - RetrievingBlock(Hash, &'static str, anyhow::Error), - /// Error retrieving transaction from the source node. - #[error("Error retrieving transaction {0} from {1} node: {2:?}")] - RetrievingTransaction(SourceTxHash, &'static str, anyhow::Error), - /// Failed to check existence of header from the target node. - #[error("Failed to check existence of header {0}/{1} on {2} node: {3:?}")] - CheckHeaderExistence(HeaderNumber, Hash, &'static str, anyhow::Error), - /// Failed to prepare proof for the transaction from the source node. - #[error("Error building transaction {0} proof on {1} node: {2:?}")] - BuildTransactionProof(String, &'static str, anyhow::Error, bool), - /// Failed to submit the transaction proof to the target node. - #[error("Error submitting transaction {0} proof to {1} node: {2:?}")] - SubmitTransactionProof(String, &'static str, anyhow::Error, bool), - /// Transaction filtering failed. - #[error("Transaction filtering has failed with {0:?}")] - TransactionFiltering(anyhow::Error, bool), - /// Utilities/metrics error. - #[error("{0}")] - Utils(#[from] relay_utils::Error), -} - -impl MaybeConnectionError for Error { - fn is_connection_error(&self) -> bool { - match *self { - Self::BuildTransactionProof(_, _, _, b) => b, - Self::SubmitTransactionProof(_, _, _, b) => b, - Self::TransactionFiltering(_, b) => b, - _ => false, - } - } -} diff --git a/bridges/relays/exchange/src/exchange.rs b/bridges/relays/exchange/src/exchange.rs deleted file mode 100644 index b4538d2636cec..0000000000000 --- a/bridges/relays/exchange/src/exchange.rs +++ /dev/null @@ -1,904 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Relaying proofs of exchange transaction. - -use crate::error::{Error, ErrorOf}; - -use anyhow::anyhow; -use async_trait::async_trait; -use relay_utils::{relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError}; -use std::{ - fmt::{Debug, Display}, - string::ToString, -}; - -/// Transaction proof pipeline. -pub trait TransactionProofPipeline: 'static { - /// Name of the transaction proof source. - const SOURCE_NAME: &'static str; - /// Name of the transaction proof target. - const TARGET_NAME: &'static str; - - /// Block type. - type Block: SourceBlock; - /// Transaction inclusion proof type. - type TransactionProof: 'static + Send + Sync; -} - -/// Block that is participating in exchange. -pub trait SourceBlock: 'static + Send + Sync { - /// Block hash type. - type Hash: 'static + Clone + Send + Sync + Debug + Display; - /// Block number type. - type Number: 'static - + Debug - + Display - + Clone - + Copy - + Send - + Sync - + Into - + std::cmp::Ord - + std::ops::Add - + num_traits::One; - /// Block transaction. - type Transaction: SourceTransaction; - - /// Return hash of the block. - fn id(&self) -> relay_utils::HeaderId; - /// Return block transactions iterator. - fn transactions(&self) -> Vec; -} - -/// Transaction that is participating in exchange. -pub trait SourceTransaction: 'static + Send { - /// Transaction hash type. - type Hash: Debug + Display + Clone; - - /// Return transaction hash. - fn hash(&self) -> Self::Hash; -} - -/// Block hash for given pipeline. -pub type BlockHashOf

= <

::Block as SourceBlock>::Hash; - -/// Block number for given pipeline. -pub type BlockNumberOf

= <

::Block as SourceBlock>::Number; - -/// Transaction hash for given pipeline. -pub type TransactionOf

= <

::Block as SourceBlock>::Transaction; - -/// Transaction hash for given pipeline. -pub type TransactionHashOf

= as SourceTransaction>::Hash; - -/// Header id. -pub type HeaderId

= relay_utils::HeaderId, BlockNumberOf

>; - -/// Source client API. -#[async_trait] -pub trait SourceClient: RelayClient { - /// Sleep until exchange-related data is (probably) updated. - async fn tick(&self); - /// Get block by hash. - async fn block_by_hash(&self, hash: BlockHashOf

) -> Result; - /// Get canonical block by number. - async fn block_by_number(&self, number: BlockNumberOf

) -> Result; - /// Return block + index where transaction has been **mined**. May return `Ok(None)` if - /// transaction is unknown to the source node. - async fn transaction_block( - &self, - hash: &TransactionHashOf

, - ) -> Result, usize)>, Self::Error>; - /// Prepare transaction proof. - async fn transaction_proof( - &self, - block: &P::Block, - tx_index: usize, - ) -> Result; -} - -/// Target client API. -#[async_trait] -pub trait TargetClient: RelayClient { - /// Sleep until exchange-related data is (probably) updated. - async fn tick(&self); - /// Returns `Ok(true)` if header is known to the target node. - async fn is_header_known(&self, id: &HeaderId

) -> std::result::Result; - /// Returns `Ok(true)` if header is finalized by the target node. - async fn is_header_finalized(&self, id: &HeaderId

) -> Result; - /// Returns best finalized header id. - async fn best_finalized_header_id(&self) -> Result, Self::Error>; - /// Returns `Ok(true)` if transaction proof is need to be relayed. - async fn filter_transaction_proof( - &self, - proof: &P::TransactionProof, - ) -> Result; - /// Submits transaction proof to the target node. - async fn submit_transaction_proof(&self, proof: P::TransactionProof) - -> Result<(), Self::Error>; -} - -/// Block transaction statistics. -#[derive(Debug, Default)] -#[cfg_attr(test, derive(PartialEq))] -pub struct RelayedBlockTransactions { - /// Total number of transactions processed (either relayed or ignored) so far. - pub processed: usize, - /// Total number of transactions successfully relayed so far. - pub relayed: usize, - /// Total number of transactions that we have failed to relay so far. - pub failed: usize, -} - -/// Relay all suitable transactions from single block. -/// -/// If connection error occurs, returns Err with number of successfully processed transactions. -/// If some other error occurs, it is ignored and other transactions are processed. -/// -/// All transaction-level traces are written by this function. This function is not tracing -/// any information about block. -pub async fn relay_block_transactions( - source_client: &impl SourceClient

, - target_client: &impl TargetClient

, - source_block: &P::Block, - mut relayed_transactions: RelayedBlockTransactions, -) -> Result { - let transactions_to_process = source_block - .transactions() - .into_iter() - .enumerate() - .skip(relayed_transactions.processed); - for (source_tx_index, source_tx) in transactions_to_process { - let result = async { - let source_tx_id = format!("{}/{}", source_block.id().1, source_tx_index); - let source_tx_proof = prepare_transaction_proof( - source_client, - &source_tx_id, - source_block, - source_tx_index, - ) - .await - .map_err(|e| (FailedClient::Source, e))?; - - let needs_to_be_relayed = - target_client.filter_transaction_proof(&source_tx_proof).await.map_err(|err| { - ( - FailedClient::Target, - Error::TransactionFiltering( - anyhow!("{:?}", err), - err.is_connection_error(), - ), - ) - })?; - - if !needs_to_be_relayed { - return Ok(false) - } - - relay_ready_transaction_proof(target_client, &source_tx_id, source_tx_proof) - .await - .map(|_| true) - .map_err(|e| (FailedClient::Target, e)) - } - .await; - - // We have two options here: - // 1) retry with the same transaction later; - // 2) report error and proceed with next transaction. - // - // Option#1 may seems better, but: - // 1) we do not track if transaction is mined (without an error) by the target node; - // 2) error could be irrecoverable (e.g. when block is already pruned by bridge module or tx - // has invalid format) && we'll end up in infinite loop of retrying the same transaction - // proof. - // - // So we're going with option#2 here (the only exception are connection errors). - match result { - Ok(false) => { - relayed_transactions.processed += 1; - }, - Ok(true) => { - log::info!( - target: "bridge", - "{} transaction {} proof has been successfully submitted to {} node", - P::SOURCE_NAME, - source_tx.hash(), - P::TARGET_NAME, - ); - - relayed_transactions.processed += 1; - relayed_transactions.relayed += 1; - }, - Err((failed_client, err)) => { - log::error!( - target: "bridge", - "Error relaying {} transaction {} proof to {} node: {}. {}", - P::SOURCE_NAME, - source_tx.hash(), - P::TARGET_NAME, - err.to_string(), - if err.is_connection_error() { - "Going to retry after delay..." - } else { - "You may need to submit proof of this transaction manually" - }, - ); - - if err.is_connection_error() { - return Err((failed_client, relayed_transactions)) - } - - relayed_transactions.processed += 1; - relayed_transactions.failed += 1; - }, - } - } - - Ok(relayed_transactions) -} - -/// Relay single transaction proof. -pub async fn relay_single_transaction_proof( - source_client: &impl SourceClient

, - target_client: &impl TargetClient

, - source_tx_hash: TransactionHashOf

, -) -> Result<(), ErrorOf

> { - // wait for transaction and header on source node - let (source_header_id, source_tx_index) = - wait_transaction_mined(source_client, &source_tx_hash).await?; - let source_block = source_client.block_by_hash(source_header_id.1.clone()).await; - let source_block = source_block.map_err(|err| { - Error::RetrievingBlock(source_header_id.1.clone(), P::SOURCE_NAME, anyhow!("{:?}", err)) - })?; - // wait for transaction and header on target node - wait_header_imported(target_client, &source_header_id).await?; - wait_header_finalized(target_client, &source_header_id).await?; - - // and finally - prepare and submit transaction proof to target node - let source_tx_id = format!("{}", source_tx_hash); - relay_ready_transaction_proof( - target_client, - &source_tx_id, - prepare_transaction_proof(source_client, &source_tx_id, &source_block, source_tx_index) - .await?, - ) - .await - .map_err(Into::into) -} - -/// Prepare transaction proof. -async fn prepare_transaction_proof( - source_client: &impl SourceClient

, - source_tx_id: &str, - source_block: &P::Block, - source_tx_index: usize, -) -> Result> { - source_client - .transaction_proof(source_block, source_tx_index) - .await - .map_err(|err| { - Error::BuildTransactionProof( - source_tx_id.to_owned(), - P::SOURCE_NAME, - anyhow!("{:?}", err), - err.is_connection_error(), - ) - }) -} - -/// Relay prepared proof of transaction. -async fn relay_ready_transaction_proof( - target_client: &impl TargetClient

, - source_tx_id: &str, - source_tx_proof: P::TransactionProof, -) -> Result<(), ErrorOf

> { - target_client.submit_transaction_proof(source_tx_proof).await.map_err(|err| { - Error::SubmitTransactionProof( - source_tx_id.to_owned(), - P::TARGET_NAME, - anyhow!("{:?}", err), - err.is_connection_error(), - ) - }) -} - -/// Wait until transaction is mined by source node. -async fn wait_transaction_mined( - source_client: &impl SourceClient

, - source_tx_hash: &TransactionHashOf

, -) -> Result<(HeaderId

, usize), ErrorOf

> { - loop { - let source_header_and_tx = - source_client.transaction_block(source_tx_hash).await.map_err(|err| { - Error::RetrievingTransaction( - source_tx_hash.clone(), - P::SOURCE_NAME, - anyhow!("{:?}", err), - ) - })?; - match source_header_and_tx { - Some((source_header_id, source_tx)) => { - log::info!( - target: "bridge", - "Transaction {} is retrieved from {} node. Continuing...", - source_tx_hash, - P::SOURCE_NAME, - ); - - return Ok((source_header_id, source_tx)) - }, - None => { - log::info!( - target: "bridge", - "Waiting for transaction {} to be mined by {} node...", - source_tx_hash, - P::SOURCE_NAME, - ); - - source_client.tick().await; - }, - } - } -} - -/// Wait until target node imports required header. -async fn wait_header_imported( - target_client: &impl TargetClient

, - source_header_id: &HeaderId

, -) -> Result<(), ErrorOf

> { - loop { - let is_header_known = - target_client.is_header_known(source_header_id).await.map_err(|err| { - Error::CheckHeaderExistence( - source_header_id.0, - source_header_id.1.clone(), - P::TARGET_NAME, - anyhow!("{:?}", err), - ) - })?; - match is_header_known { - true => { - log::info!( - target: "bridge", - "Header {}/{} is known to {} node. Continuing.", - source_header_id.0, - source_header_id.1, - P::TARGET_NAME, - ); - - return Ok(()) - }, - false => { - log::info!( - target: "bridge", - "Waiting for header {}/{} to be imported by {} node...", - source_header_id.0, - source_header_id.1, - P::TARGET_NAME, - ); - - target_client.tick().await; - }, - } - } -} - -/// Wait until target node finalizes required header. -async fn wait_header_finalized( - target_client: &impl TargetClient

, - source_header_id: &HeaderId

, -) -> Result<(), ErrorOf

> { - loop { - let is_header_finalized = - target_client.is_header_finalized(source_header_id).await.map_err(|err| { - Error::Finality( - source_header_id.0, - source_header_id.1.clone(), - P::TARGET_NAME, - anyhow!("{:?}", err), - ) - })?; - match is_header_finalized { - true => { - log::info!( - target: "bridge", - "Header {}/{} is finalizd by {} node. Continuing.", - source_header_id.0, - source_header_id.1, - P::TARGET_NAME, - ); - - return Ok(()) - }, - false => { - log::info!( - target: "bridge", - "Waiting for header {}/{} to be finalized by {} node...", - source_header_id.0, - source_header_id.1, - P::TARGET_NAME, - ); - - target_client.tick().await; - }, - } - } -} - -#[cfg(test)] -pub(crate) mod tests { - use super::*; - - use parking_lot::Mutex; - use relay_utils::HeaderId; - use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - }; - - pub fn test_block_id() -> TestHeaderId { - HeaderId(1, 1) - } - - pub fn test_next_block_id() -> TestHeaderId { - HeaderId(2, 2) - } - - pub fn test_transaction_hash(tx_index: u64) -> TestTransactionHash { - 200 + tx_index - } - - pub fn test_transaction(tx_index: u64) -> TestTransaction { - TestTransaction(test_transaction_hash(tx_index)) - } - - pub fn test_block() -> TestBlock { - TestBlock(test_block_id(), vec![test_transaction(0)]) - } - - pub fn test_next_block() -> TestBlock { - TestBlock(test_next_block_id(), vec![test_transaction(1)]) - } - - pub type TestBlockNumber = u64; - pub type TestBlockHash = u64; - pub type TestTransactionHash = u64; - pub type TestHeaderId = HeaderId; - - #[derive(Debug, Clone, PartialEq)] - pub struct TestError(pub bool); - - impl MaybeConnectionError for TestError { - fn is_connection_error(&self) -> bool { - self.0 - } - } - - pub struct TestTransactionProofPipeline; - - impl TransactionProofPipeline for TestTransactionProofPipeline { - const SOURCE_NAME: &'static str = "TestSource"; - const TARGET_NAME: &'static str = "TestTarget"; - - type Block = TestBlock; - type TransactionProof = TestTransactionProof; - } - - #[derive(Debug, Clone)] - pub struct TestBlock(pub TestHeaderId, pub Vec); - - impl SourceBlock for TestBlock { - type Hash = TestBlockHash; - type Number = TestBlockNumber; - type Transaction = TestTransaction; - - fn id(&self) -> TestHeaderId { - self.0 - } - - fn transactions(&self) -> Vec { - self.1.clone() - } - } - - #[derive(Debug, Clone)] - pub struct TestTransaction(pub TestTransactionHash); - - impl SourceTransaction for TestTransaction { - type Hash = TestTransactionHash; - - fn hash(&self) -> Self::Hash { - self.0 - } - } - - #[derive(Debug, Clone, PartialEq)] - pub struct TestTransactionProof(pub TestTransactionHash); - - #[derive(Clone)] - pub struct TestTransactionsSource { - pub on_tick: Arc, - pub data: Arc>, - } - - pub struct TestTransactionsSourceData { - pub block: Result, - pub transaction_block: Result, TestError>, - pub proofs_to_fail: HashMap, - } - - impl TestTransactionsSource { - pub fn new(on_tick: Box) -> Self { - Self { - on_tick: Arc::new(on_tick), - data: Arc::new(Mutex::new(TestTransactionsSourceData { - block: Ok(test_block()), - transaction_block: Ok(Some((test_block_id(), 0))), - proofs_to_fail: HashMap::new(), - })), - } - } - } - - #[async_trait] - impl RelayClient for TestTransactionsSource { - type Error = TestError; - - async fn reconnect(&mut self) -> Result<(), TestError> { - Ok(()) - } - } - - #[async_trait] - impl SourceClient for TestTransactionsSource { - async fn tick(&self) { - (self.on_tick)(&mut *self.data.lock()) - } - - async fn block_by_hash(&self, _: TestBlockHash) -> Result { - self.data.lock().block.clone() - } - - async fn block_by_number(&self, _: TestBlockNumber) -> Result { - self.data.lock().block.clone() - } - - async fn transaction_block( - &self, - _: &TestTransactionHash, - ) -> Result, TestError> { - self.data.lock().transaction_block.clone() - } - - async fn transaction_proof( - &self, - block: &TestBlock, - index: usize, - ) -> Result { - let tx_hash = block.1[index].hash(); - let proof_error = self.data.lock().proofs_to_fail.get(&tx_hash).cloned(); - if let Some(err) = proof_error { - return Err(err) - } - - Ok(TestTransactionProof(tx_hash)) - } - } - - #[derive(Clone)] - pub struct TestTransactionsTarget { - pub on_tick: Arc, - pub data: Arc>, - } - - pub struct TestTransactionsTargetData { - pub is_header_known: Result, - pub is_header_finalized: Result, - pub best_finalized_header_id: Result, - pub transactions_to_accept: HashSet, - pub submitted_proofs: Vec, - } - - impl TestTransactionsTarget { - pub fn new(on_tick: Box) -> Self { - Self { - on_tick: Arc::new(on_tick), - data: Arc::new(Mutex::new(TestTransactionsTargetData { - is_header_known: Ok(true), - is_header_finalized: Ok(true), - best_finalized_header_id: Ok(test_block_id()), - transactions_to_accept: vec![test_transaction_hash(0)].into_iter().collect(), - submitted_proofs: Vec::new(), - })), - } - } - } - - #[async_trait] - impl RelayClient for TestTransactionsTarget { - type Error = TestError; - - async fn reconnect(&mut self) -> Result<(), TestError> { - Ok(()) - } - } - - #[async_trait] - impl TargetClient for TestTransactionsTarget { - async fn tick(&self) { - (self.on_tick)(&mut *self.data.lock()) - } - - async fn is_header_known(&self, _: &TestHeaderId) -> Result { - self.data.lock().is_header_known.clone() - } - - async fn is_header_finalized(&self, _: &TestHeaderId) -> Result { - self.data.lock().is_header_finalized.clone() - } - - async fn best_finalized_header_id(&self) -> Result { - self.data.lock().best_finalized_header_id.clone() - } - - async fn filter_transaction_proof( - &self, - proof: &TestTransactionProof, - ) -> Result { - Ok(self.data.lock().transactions_to_accept.contains(&proof.0)) - } - - async fn submit_transaction_proof( - &self, - proof: TestTransactionProof, - ) -> Result<(), TestError> { - self.data.lock().submitted_proofs.push(proof); - Ok(()) - } - } - - fn ensure_relay_single_success( - source: &TestTransactionsSource, - target: &TestTransactionsTarget, - ) { - assert!(async_std::task::block_on(relay_single_transaction_proof( - source, - target, - test_transaction_hash(0) - )) - .is_ok()); - assert_eq!( - target.data.lock().submitted_proofs, - vec![TestTransactionProof(test_transaction_hash(0))], - ); - } - - fn ensure_relay_single_failure(source: TestTransactionsSource, target: TestTransactionsTarget) { - assert!(async_std::task::block_on(relay_single_transaction_proof( - &source, - &target, - test_transaction_hash(0), - )) - .is_err()); - assert!(target.data.lock().submitted_proofs.is_empty()); - } - - #[test] - fn ready_transaction_proof_relayed_immediately() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - ensure_relay_single_success(&source, &target) - } - - #[test] - fn relay_transaction_proof_waits_for_transaction_to_be_mined() { - let source = TestTransactionsSource::new(Box::new(|source_data| { - assert_eq!(source_data.transaction_block, Ok(None)); - source_data.transaction_block = Ok(Some((test_block_id(), 0))); - })); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - // transaction is not yet mined, but will be available after first wait (tick) - source.data.lock().transaction_block = Ok(None); - - ensure_relay_single_success(&source, &target) - } - - #[test] - fn relay_transaction_fails_when_transaction_retrieval_fails() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - source.data.lock().transaction_block = Err(TestError(false)); - - ensure_relay_single_failure(source, target) - } - - #[test] - fn relay_transaction_fails_when_proof_retrieval_fails() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - source - .data - .lock() - .proofs_to_fail - .insert(test_transaction_hash(0), TestError(false)); - - ensure_relay_single_failure(source, target) - } - - #[test] - fn relay_transaction_proof_waits_for_header_to_be_imported() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|target_data| { - assert_eq!(target_data.is_header_known, Ok(false)); - target_data.is_header_known = Ok(true); - })); - - // header is not yet imported, but will be available after first wait (tick) - target.data.lock().is_header_known = Ok(false); - - ensure_relay_single_success(&source, &target) - } - - #[test] - fn relay_transaction_proof_fails_when_is_header_known_fails() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - target.data.lock().is_header_known = Err(TestError(false)); - - ensure_relay_single_failure(source, target) - } - - #[test] - fn relay_transaction_proof_waits_for_header_to_be_finalized() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|target_data| { - assert_eq!(target_data.is_header_finalized, Ok(false)); - target_data.is_header_finalized = Ok(true); - })); - - // header is not yet finalized, but will be available after first wait (tick) - target.data.lock().is_header_finalized = Ok(false); - - ensure_relay_single_success(&source, &target) - } - - #[test] - fn relay_transaction_proof_fails_when_is_header_finalized_fails() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - target.data.lock().is_header_finalized = Err(TestError(false)); - - ensure_relay_single_failure(source, target) - } - - #[test] - fn relay_transaction_proof_fails_when_target_node_rejects_proof() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - target.data.lock().transactions_to_accept.remove(&test_transaction_hash(0)); - - ensure_relay_single_success(&source, &target) - } - - fn test_relay_block_transactions( - source: &TestTransactionsSource, - target: &TestTransactionsTarget, - pre_relayed: RelayedBlockTransactions, - ) -> Result { - async_std::task::block_on(relay_block_transactions( - source, - target, - &TestBlock( - test_block_id(), - vec![test_transaction(0), test_transaction(1), test_transaction(2)], - ), - pre_relayed, - )) - .map_err(|(_, transactions)| transactions) - } - - #[test] - fn relay_block_transactions_process_all_transactions() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - // let's only accept tx#1 - target.data.lock().transactions_to_accept.remove(&test_transaction_hash(0)); - target.data.lock().transactions_to_accept.insert(test_transaction_hash(1)); - - let relayed_transactions = - test_relay_block_transactions(&source, &target, Default::default()); - assert_eq!( - relayed_transactions, - Ok(RelayedBlockTransactions { processed: 3, relayed: 1, failed: 0 }), - ); - assert_eq!( - target.data.lock().submitted_proofs, - vec![TestTransactionProof(test_transaction_hash(1))], - ); - } - - #[test] - fn relay_block_transactions_ignores_transaction_failure() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - // let's reject proof for tx#0 - source - .data - .lock() - .proofs_to_fail - .insert(test_transaction_hash(0), TestError(false)); - - let relayed_transactions = - test_relay_block_transactions(&source, &target, Default::default()); - assert_eq!( - relayed_transactions, - Ok(RelayedBlockTransactions { processed: 3, relayed: 0, failed: 1 }), - ); - assert_eq!(target.data.lock().submitted_proofs, vec![],); - } - - #[test] - fn relay_block_transactions_fails_on_connection_error() { - let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); - let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - - // fail with connection error when preparing proof for tx#1 - source - .data - .lock() - .proofs_to_fail - .insert(test_transaction_hash(1), TestError(true)); - - let relayed_transactions = - test_relay_block_transactions(&source, &target, Default::default()); - assert_eq!( - relayed_transactions, - Err(RelayedBlockTransactions { processed: 1, relayed: 1, failed: 0 }), - ); - assert_eq!( - target.data.lock().submitted_proofs, - vec![TestTransactionProof(test_transaction_hash(0))], - ); - - // now do not fail on tx#2 - source.data.lock().proofs_to_fail.clear(); - // and also relay tx#3 - target.data.lock().transactions_to_accept.insert(test_transaction_hash(2)); - - let relayed_transactions = - test_relay_block_transactions(&source, &target, relayed_transactions.unwrap_err()); - assert_eq!( - relayed_transactions, - Ok(RelayedBlockTransactions { processed: 3, relayed: 2, failed: 0 }), - ); - assert_eq!( - target.data.lock().submitted_proofs, - vec![ - TestTransactionProof(test_transaction_hash(0)), - TestTransactionProof(test_transaction_hash(2)) - ], - ); - } -} diff --git a/bridges/relays/exchange/src/exchange_loop.rs b/bridges/relays/exchange/src/exchange_loop.rs deleted file mode 100644 index 84d216f43968f..0000000000000 --- a/bridges/relays/exchange/src/exchange_loop.rs +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Relaying proofs of exchange transactions. - -use crate::{ - error::Error, - exchange::{ - relay_block_transactions, BlockNumberOf, RelayedBlockTransactions, SourceClient, - TargetClient, TransactionProofPipeline, - }, - exchange_loop_metrics::ExchangeLoopMetrics, -}; - -use crate::error::ErrorOf; -use backoff::backoff::Backoff; -use futures::{future::FutureExt, select}; -use num_traits::One; -use relay_utils::{ - metrics::{GlobalMetrics, MetricsParams}, - retry_backoff, FailedClient, MaybeConnectionError, -}; -use std::future::Future; - -/// Transactions proofs relay state. -#[derive(Debug)] -pub struct TransactionProofsRelayState { - /// Number of last header we have processed so far. - pub best_processed_header_number: BlockNumber, -} - -/// Transactions proofs relay storage. -pub trait TransactionProofsRelayStorage: 'static + Clone + Send + Sync { - /// Associated block number. - type BlockNumber: 'static + Send + Sync; - - /// Get relay state. - fn state(&self) -> TransactionProofsRelayState; - /// Update relay state. - fn set_state(&mut self, state: &TransactionProofsRelayState); -} - -/// In-memory storage for auto-relay loop. -#[derive(Debug, Clone)] -pub struct InMemoryStorage { - best_processed_header_number: BlockNumber, -} - -impl InMemoryStorage { - /// Created new in-memory storage with given best processed block number. - pub fn new(best_processed_header_number: BlockNumber) -> Self { - InMemoryStorage { best_processed_header_number } - } -} - -impl TransactionProofsRelayStorage - for InMemoryStorage -{ - type BlockNumber = BlockNumber; - - fn state(&self) -> TransactionProofsRelayState { - TransactionProofsRelayState { - best_processed_header_number: self.best_processed_header_number, - } - } - - fn set_state(&mut self, state: &TransactionProofsRelayState) { - self.best_processed_header_number = state.best_processed_header_number; - } -} - -/// Return prefix that will be used by default to expose Prometheus metrics of the exchange loop. -pub fn metrics_prefix() -> String { - format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME) -} - -/// Run proofs synchronization. -pub async fn run( - storage: impl TransactionProofsRelayStorage>, - source_client: impl SourceClient

, - target_client: impl TargetClient

, - metrics_params: MetricsParams, - exit_signal: impl Future + 'static + Send, -) -> Result<(), ErrorOf

> { - let exit_signal = exit_signal.shared(); - - relay_utils::relay_loop(source_client, target_client) - .with_metrics(Some(metrics_prefix::

()), metrics_params) - .loop_metric(ExchangeLoopMetrics::new)? - .standalone_metric(GlobalMetrics::new)? - .expose() - .await? - .run(metrics_prefix::

(), move |source_client, target_client, metrics| { - run_until_connection_lost( - storage.clone(), - source_client, - target_client, - metrics, - exit_signal.clone(), - ) - }) - .await - .map_err(Error::Utils) -} - -/// Run proofs synchronization. -async fn run_until_connection_lost( - mut storage: impl TransactionProofsRelayStorage>, - source_client: impl SourceClient

, - target_client: impl TargetClient

, - metrics_exch: Option, - exit_signal: impl Future + Send, -) -> Result<(), FailedClient> { - let mut retry_backoff = retry_backoff(); - let mut state = storage.state(); - let mut current_finalized_block = None; - - let exit_signal = exit_signal.fuse(); - - futures::pin_mut!(exit_signal); - - loop { - let iteration_result = run_loop_iteration( - &mut storage, - &source_client, - &target_client, - &mut state, - &mut current_finalized_block, - metrics_exch.as_ref(), - ) - .await; - - if let Err((is_connection_error, failed_client)) = iteration_result { - if is_connection_error { - return Err(failed_client) - } - - let retry_timeout = - retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY); - select! { - _ = async_std::task::sleep(retry_timeout).fuse() => {}, - _ = exit_signal => return Ok(()), - } - } else { - retry_backoff.reset(); - - select! { - _ = source_client.tick().fuse() => {}, - _ = exit_signal => return Ok(()), - } - } - } -} - -/// Run exchange loop until we need to break. -async fn run_loop_iteration( - storage: &mut impl TransactionProofsRelayStorage>, - source_client: &impl SourceClient

, - target_client: &impl TargetClient

, - state: &mut TransactionProofsRelayState>, - current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>, - exchange_loop_metrics: Option<&ExchangeLoopMetrics>, -) -> Result<(), (bool, FailedClient)> { - let best_finalized_header_id = match target_client.best_finalized_header_id().await { - Ok(best_finalized_header_id) => { - log::debug!( - target: "bridge", - "Got best finalized {} block from {} node: {:?}", - P::SOURCE_NAME, - P::TARGET_NAME, - best_finalized_header_id, - ); - - best_finalized_header_id - }, - Err(err) => { - log::error!( - target: "bridge", - "Failed to retrieve best {} header id from {} node: {:?}. Going to retry...", - P::SOURCE_NAME, - P::TARGET_NAME, - err, - ); - - return Err((err.is_connection_error(), FailedClient::Target)) - }, - }; - - loop { - // if we already have some finalized block body, try to relay its transactions - if let Some((block, relayed_transactions)) = current_finalized_block.take() { - let result = relay_block_transactions( - source_client, - target_client, - &block, - relayed_transactions, - ) - .await; - - match result { - Ok(relayed_transactions) => { - log::info!( - target: "bridge", - "Relay has processed {} block #{}. Total/Relayed/Failed transactions: {}/{}/{}", - P::SOURCE_NAME, - state.best_processed_header_number, - relayed_transactions.processed, - relayed_transactions.relayed, - relayed_transactions.failed, - ); - - state.best_processed_header_number = - state.best_processed_header_number + One::one(); - storage.set_state(state); - - if let Some(exchange_loop_metrics) = exchange_loop_metrics { - exchange_loop_metrics.update::

( - state.best_processed_header_number, - best_finalized_header_id.0, - relayed_transactions, - ); - } - - // we have just updated state => proceed to next block retrieval - }, - Err((failed_client, relayed_transactions)) => { - *current_finalized_block = Some((block, relayed_transactions)); - return Err((true, failed_client)) - }, - } - } - - // we may need to retrieve finalized block body from source node - if best_finalized_header_id.0 > state.best_processed_header_number { - let next_block_number = state.best_processed_header_number + One::one(); - let result = source_client.block_by_number(next_block_number).await; - - match result { - Ok(block) => { - *current_finalized_block = Some((block, RelayedBlockTransactions::default())); - - // we have received new finalized block => go back to relay its transactions - continue - }, - Err(err) => { - log::error!( - target: "bridge", - "Failed to retrieve canonical block #{} from {} node: {:?}. Going to retry...", - next_block_number, - P::SOURCE_NAME, - err, - ); - - return Err((err.is_connection_error(), FailedClient::Source)) - }, - } - } - - // there are no any transactions we need to relay => wait for new data - return Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::exchange::tests::{ - test_next_block, test_next_block_id, test_transaction_hash, TestTransactionProof, - TestTransactionsSource, TestTransactionsTarget, - }; - use futures::{future::FutureExt, stream::StreamExt}; - - #[test] - fn exchange_loop_is_able_to_relay_proofs() { - let storage = InMemoryStorage { best_processed_header_number: 0 }; - let target = - TestTransactionsTarget::new(Box::new(|_| unreachable!("no target ticks allowed"))); - let target_data = target.data.clone(); - let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); - - let source = TestTransactionsSource::new(Box::new(move |data| { - let transaction1_relayed = target_data - .lock() - .submitted_proofs - .contains(&TestTransactionProof(test_transaction_hash(0))); - let transaction2_relayed = target_data - .lock() - .submitted_proofs - .contains(&TestTransactionProof(test_transaction_hash(1))); - match (transaction1_relayed, transaction2_relayed) { - (true, true) => exit_sender.unbounded_send(()).unwrap(), - (true, false) => { - data.block = Ok(test_next_block()); - target_data.lock().best_finalized_header_id = Ok(test_next_block_id()); - target_data.lock().transactions_to_accept.insert(test_transaction_hash(1)); - }, - _ => (), - } - })); - - let _ = async_std::task::block_on(run( - storage, - source, - target, - MetricsParams::disabled(), - exit_receiver.into_future().map(|(_, _)| ()), - )); - } -} diff --git a/bridges/relays/exchange/src/exchange_loop_metrics.rs b/bridges/relays/exchange/src/exchange_loop_metrics.rs deleted file mode 100644 index 82d3e649d4319..0000000000000 --- a/bridges/relays/exchange/src/exchange_loop_metrics.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Metrics for currency-exchange relay loop. - -use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline}; -use relay_utils::metrics::{ - metric_name, register, Counter, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64, -}; - -/// Exchange transactions relay metrics. -#[derive(Clone)] -pub struct ExchangeLoopMetrics { - /// Best finalized block numbers - "processed" and "known". - best_block_numbers: GaugeVec, - /// Number of processed blocks ("total"). - processed_blocks: Counter, - /// Number of processed transactions ("total", "relayed" and "failed"). - processed_transactions: CounterVec, -} - -impl ExchangeLoopMetrics { - /// Create and register exchange loop metrics. - pub fn new(registry: &Registry, prefix: Option<&str>) -> Result { - Ok(ExchangeLoopMetrics { - best_block_numbers: register( - GaugeVec::new( - Opts::new( - metric_name(prefix, "best_block_numbers"), - "Best finalized block numbers", - ), - &["type"], - )?, - registry, - )?, - processed_blocks: register( - Counter::new( - metric_name(prefix, "processed_blocks"), - "Total number of processed blocks", - )?, - registry, - )?, - processed_transactions: register( - CounterVec::new( - Opts::new( - metric_name(prefix, "processed_transactions"), - "Total number of processed transactions", - ), - &["type"], - )?, - registry, - )?, - }) - } -} - -impl ExchangeLoopMetrics { - /// Update metrics when single block is relayed. - pub fn update( - &self, - best_processed_block_number: BlockNumberOf

, - best_known_block_number: BlockNumberOf

, - relayed_transactions: RelayedBlockTransactions, - ) { - self.best_block_numbers - .with_label_values(&["processed"]) - .set(best_processed_block_number.into()); - self.best_block_numbers - .with_label_values(&["known"]) - .set(best_known_block_number.into()); - - self.processed_blocks.inc(); - - self.processed_transactions - .with_label_values(&["total"]) - .inc_by(relayed_transactions.processed as _); - self.processed_transactions - .with_label_values(&["relayed"]) - .inc_by(relayed_transactions.relayed as _); - self.processed_transactions - .with_label_values(&["failed"]) - .inc_by(relayed_transactions.failed as _); - } -} diff --git a/bridges/relays/exchange/src/lib.rs b/bridges/relays/exchange/src/lib.rs deleted file mode 100644 index d167e5aa398ee..0000000000000 --- a/bridges/relays/exchange/src/lib.rs +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Relaying [`currency-exchange`](../pallet_bridge_currency_exchange/index.html) application -//! specific data. Currency exchange application allows exchanging tokens between bridged chains. -//! This module provides entrypoints for crafting and submitting (single and multiple) -//! proof-of-exchange-at-source-chain transaction(s) to target chain. - -#![warn(missing_docs)] - -pub mod error; -pub mod exchange; -pub mod exchange_loop; -pub mod exchange_loop_metrics;