diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index a5f5baba5c..16f3bbfabc 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -119,11 +119,20 @@ pub enum TransactionError { /// The finality subscription expired (after ~512 blocks we give up if the /// block hasn't yet been finalized). #[error("The finality subscription expired")] - FinalitySubscriptionTimeout, + FinalityTimeout, /// The block hash that the transaction was added to could not be found. /// This is probably because the block was retracted before being finalized. #[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")] BlockNotFound, + /// The transaction was deemed invalid in the current chain state. + #[error("The transaction is no longer valid")] + Invalid, + /// The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority + #[error("The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority.")] + Usurped, + /// The transaction was dropped because of some limit + #[error("The transaction was dropped from the pool because of a limit.")] + Dropped, } /// Something went wrong trying to encode a storage address. diff --git a/subxt/src/rpc/rpc_client.rs b/subxt/src/rpc/rpc_client.rs index 9b5da5c121..e5e408d4f8 100644 --- a/subxt/src/rpc/rpc_client.rs +++ b/subxt/src/rpc/rpc_client.rs @@ -166,7 +166,8 @@ impl std::fmt::Debug for Subscription { } impl Subscription { - fn new(inner: RpcSubscription) -> Self { + /// Creates a new [`Subscription`]. + pub fn new(inner: RpcSubscription) -> Self { Self { inner, _marker: std::marker::PhantomData, diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index 593d01a10f..248b0bda15 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -69,10 +69,10 @@ where /// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the /// transaction progresses, use [`TxProgress::next_item()`] instead. /// - /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they - /// may well indicate with some probability that the transaction will not make it into a block, - /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. + /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some + /// probability that the transaction will not make it into a block but there is no guarantee + /// that this is true. In those cases the stream is closed however, so you currently have no way to find + /// out if they finally made it into a block or not. pub async fn wait_for_in_block(mut self) -> Result, Error> { while let Some(status) = self.next_item().await { match status? { @@ -80,8 +80,11 @@ where TxStatus::InBlock(s) | TxStatus::Finalized(s) => return Ok(s), // Error scenarios; return the error. TxStatus::FinalityTimeout(_) => { - return Err(TransactionError::FinalitySubscriptionTimeout.into()) + return Err(TransactionError::FinalityTimeout.into()) } + TxStatus::Invalid => return Err(TransactionError::Invalid.into()), + TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()), + TxStatus::Dropped => return Err(TransactionError::Dropped.into()), // Ignore anything else and wait for next status event: _ => continue, } @@ -95,10 +98,10 @@ where /// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the /// transaction progresses, use [`TxProgress::next_item()`] instead. /// - /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they - /// may well indicate with some probability that the transaction will not make it into a block, - /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. + /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some + /// probability that the transaction will not make it into a block but there is no guarantee + /// that this is true. In those cases the stream is closed however, so you currently have no way to find + /// out if they finally made it into a block or not. pub async fn wait_for_finalized(mut self) -> Result, Error> { while let Some(status) = self.next_item().await { match status? { @@ -106,8 +109,11 @@ where TxStatus::Finalized(s) => return Ok(s), // Error scenarios; return the error. TxStatus::FinalityTimeout(_) => { - return Err(TransactionError::FinalitySubscriptionTimeout.into()) + return Err(TransactionError::FinalityTimeout.into()) } + TxStatus::Invalid => return Err(TransactionError::Invalid.into()), + TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()), + TxStatus::Dropped => return Err(TransactionError::Dropped.into()), // Ignore and wait for next status event: _ => continue, } @@ -122,10 +128,10 @@ where /// **Note:** consumes self. If you'd like to perform multiple actions as progress is made, /// use [`TxProgress::next_item()`] instead. /// - /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they - /// may well indicate with some probability that the transaction will not make it into a block, - /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself. + /// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some + /// probability that the transaction will not make it into a block but there is no guarantee + /// that this is true. In those cases the stream is closed however, so you currently have no way to find + /// out if they finally made it into a block or not. pub async fn wait_for_finalized_success( self, ) -> Result, Error> { @@ -134,7 +140,7 @@ where } } -impl> Stream for TxProgress { +impl Stream for TxProgress { type Item = Result, Error>; fn poll_next( @@ -155,13 +161,19 @@ impl> Stream for TxProgress { TxStatus::InBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone())) } SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash), - SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash), - SubstrateTxStatus::Dropped => TxStatus::Dropped, - SubstrateTxStatus::Invalid => TxStatus::Invalid, - // Only the following statuses are actually considered "final" (see the substrate - // docs on `TxStatus`). Basically, either the transaction makes it into a - // block, or we eventually give up on waiting for it to make it into a block. - // Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually. + // Only the following statuses are considered "final", in a sense that they end the stream (see the substrate + // docs on `TxStatus`): + // + // - Usurped + // - Finalized + // - FinalityTimeout + // - Invalid + // - Dropped + // + // Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually, + // the server considers them final and closes the connection, when they are encountered. + // In those cases the stream is closed however, so you currently have no way to find + // out if they finally made it into a block or not. // // As an example, a transaction that is `Invalid` on one node due to having the wrong // nonce might still be valid on some fork on another node which ends up being finalized. @@ -175,6 +187,18 @@ impl> Stream for TxProgress { self.sub = None; TxStatus::Finalized(TxInBlock::new(hash, self.ext_hash, self.client.clone())) } + SubstrateTxStatus::Usurped(hash) => { + self.sub = None; + TxStatus::Usurped(hash) + } + SubstrateTxStatus::Dropped => { + self.sub = None; + TxStatus::Dropped + } + SubstrateTxStatus::Invalid => { + self.sub = None; + TxStatus::Invalid + } } }) } @@ -209,6 +233,12 @@ impl> Stream for TxProgress { /// there might be cases where transactions alternate between `Future` and `Ready` /// pool, and are `Broadcast` in the meantime. /// +/// You are free to unsubscribe from notifications at any point. +/// The first one will be emitted when the block in which the transaction was included gets +/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality +/// within 512 blocks. This either indicates that finality is not available for your chain, +/// or that finality gadget is lagging behind. +/// /// Note that there are conditions that may cause transactions to reappear in the pool: /// /// 1. Due to possible forks, the transaction that ends up being included @@ -220,12 +250,17 @@ impl> Stream for TxProgress { /// pool about such cases). /// 4. `Retracted` transactions might be included in a future block. /// -/// The stream is considered finished only when either the `Finalized` or `FinalityTimeout` -/// event is triggered. You are however free to unsubscribe from notifications at any point. -/// The first one will be emitted when the block in which the transaction was included gets -/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality -/// within 512 blocks. This either indicates that finality is not available for your chain, -/// or that finality gadget is lagging behind. +/// Even though these cases can happen, the server-side of the stream is closed, if one of the following is encountered: +/// - Usurped +/// - Finalized +/// - FinalityTimeout +/// - Invalid +/// - Dropped +/// +/// In any of these cases the client side TxProgress stream is also closed. +/// In those cases the stream is closed however, so you currently have no way to find +/// out if they finally made it into a block or not. + #[derive(Derivative)] #[derivative(Debug(bound = "C: std::fmt::Debug"))] pub enum TxStatus { @@ -284,7 +319,7 @@ pub struct TxInBlock { client: C, } -impl> TxInBlock { +impl TxInBlock { pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self { Self { block_hash, @@ -302,7 +337,9 @@ impl> TxInBlock { pub fn extrinsic_hash(&self) -> T::Hash { self.ext_hash } +} +impl> TxInBlock { /// Fetch the events associated with this transaction. If the transaction /// was successful (ie no `ExtrinsicFailed`) events were found, then we return /// the events associated with it. If the transaction was not successful, or @@ -370,3 +407,115 @@ impl> TxInBlock { )) } } + +#[cfg(test)] +mod test { + use std::pin::Pin; + + use futures::Stream; + + use crate::{ + client::{OfflineClientT, OnlineClientT}, + config::{ + extrinsic_params::BaseExtrinsicParams, + polkadot::{PlainTip, PolkadotConfig}, + WithExtrinsicParams, + }, + error::RpcError, + rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription}, + tx::TxProgress, + Config, Error, SubstrateConfig, + }; + + use serde_json::value::RawValue; + + #[derive(Clone, Debug)] + struct MockClient; + + impl OfflineClientT for MockClient { + fn metadata(&self) -> crate::Metadata { + panic!("just a mock impl to satisfy trait bounds") + } + + fn genesis_hash(&self) -> ::Hash { + panic!("just a mock impl to satisfy trait bounds") + } + + fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion { + panic!("just a mock impl to satisfy trait bounds") + } + } + + type MockTxProgress = TxProgress; + type MockHash = , + > as Config>::Hash; + type MockSubstrateTxStatus = SubstrateTxStatus; + + impl OnlineClientT for MockClient { + fn rpc(&self) -> &crate::rpc::Rpc { + panic!("just a mock impl to satisfy trait bounds") + } + } + + #[tokio::test] + async fn wait_for_finalized_returns_err_when_usurped() { + let tx_progress = mock_tx_progress(vec![ + SubstrateTxStatus::Ready, + SubstrateTxStatus::Usurped(Default::default()), + ]); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Usurped)) + )); + } + + #[tokio::test] + async fn wait_for_finalized_returns_err_when_dropped() { + let tx_progress = + mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Dropped)) + )); + } + + #[tokio::test] + async fn wait_for_finalized_returns_err_when_invalid() { + let tx_progress = + mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]); + let finalized_result = tx_progress.wait_for_finalized().await; + assert!(matches!( + finalized_result, + Err(Error::Transaction(crate::error::TransactionError::Invalid)) + )); + } + + fn mock_tx_progress(statuses: Vec) -> MockTxProgress { + let sub = create_substrate_tx_status_subscription(statuses); + TxProgress::new(sub, MockClient, Default::default()) + } + + fn create_substrate_tx_status_subscription( + elements: Vec, + ) -> Subscription { + let rpc_substription_stream: Pin< + Box, RpcError>> + Send + 'static>, + > = Box::pin(futures::stream::iter(elements.into_iter().map(|e| { + let s = serde_json::to_string(&e).unwrap(); + let r: Box = RawValue::from_string(s).unwrap(); + Ok(r) + }))); + + let rpc_subscription: RpcSubscription = RpcSubscription { + stream: rpc_substription_stream, + id: None, + }; + + let sub: Subscription = Subscription::new(rpc_subscription); + sub + } +}