Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait_for_finalized behavior if the tx dropped, usurped or invalid #897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion subxt/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,20 @@ pub enum TransactionError {
/// The finality subscription expired (after ~512 blocks we give up if the
/// block hasn't yet been finalized).
#[error("The finality subscription expired")]
FinalitySubscriptionTimeout,
FinalityTimeout,
/// The block hash that the transaction was added to could not be found.
/// This is probably because the block was retracted before being finalized.
#[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")]
BlockNotFound,
/// The transaction was deemed invalid in the current chain state.
#[error("The transaction is no longer valid")]
Invalid,
/// The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority
#[error("The transaction was replaced by a transaction with the same (sender, nonce) pair but with higher priority.")]
Usurped,
/// The transaction was dropped because of some limit
#[error("The transaction was dropped from the pool because of a limit.")]
Dropped,
}

/// Something went wrong trying to encode a storage address.
Expand Down
3 changes: 2 additions & 1 deletion subxt/src/rpc/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ impl<Res> std::fmt::Debug for Subscription<Res> {
}

impl<Res> Subscription<Res> {
fn new(inner: RpcSubscription) -> Self {
/// Creates a new [`Subscription`].
pub fn new(inner: RpcSubscription) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
Expand Down
207 changes: 178 additions & 29 deletions subxt/src/tx/tx_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,22 @@ where
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TxProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself.
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no way to find
/// out if they finally made it into a block or not.
pub async fn wait_for_in_block(mut self) -> Result<TxInBlock<T, C>, Error> {
while let Some(status) = self.next_item().await {
match status? {
// Finalized or otherwise in a block! Return.
TxStatus::InBlock(s) | TxStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
TxStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
return Err(TransactionError::FinalityTimeout.into())
}
TxStatus::Invalid => return Err(TransactionError::Invalid.into()),
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()),
TxStatus::Dropped => return Err(TransactionError::Dropped.into()),
// Ignore anything else and wait for next status event:
_ => continue,
}
Expand All @@ -95,19 +98,22 @@ where
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TxProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself.
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no way to find
/// out if they finally made it into a block or not.
pub async fn wait_for_finalized(mut self) -> Result<TxInBlock<T, C>, Error> {
while let Some(status) = self.next_item().await {
match status? {
// Finalized! Return.
TxStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
TxStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
return Err(TransactionError::FinalityTimeout.into())
}
TxStatus::Invalid => return Err(TransactionError::Invalid.into()),
TxStatus::Usurped(_) => return Err(TransactionError::Usurped.into()),
TxStatus::Dropped => return Err(TransactionError::Dropped.into()),
// Ignore and wait for next status event:
_ => continue,
}
Expand All @@ -122,10 +128,10 @@ where
/// **Note:** consumes self. If you'd like to perform multiple actions as progress is made,
/// use [`TxProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TxProgress::next_item()`] API if you'd like to handle these statuses yourself.
/// **Note:** transaction statuses like `Invalid`/`Usurped`/`Dropped` indicate with some
/// probability that the transaction will not make it into a block but there is no guarantee
/// that this is true. In those cases the stream is closed however, so you currently have no way to find
/// out if they finally made it into a block or not.
pub async fn wait_for_finalized_success(
self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, Error> {
Expand All @@ -134,7 +140,7 @@ where
}
}

impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
impl<T: Config, C: Clone> Stream for TxProgress<T, C> {
type Item = Result<TxStatus<T, C>, Error>;

fn poll_next(
Expand All @@ -155,13 +161,19 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
TxStatus::InBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone()))
}
SubstrateTxStatus::Retracted(hash) => TxStatus::Retracted(hash),
SubstrateTxStatus::Usurped(hash) => TxStatus::Usurped(hash),
SubstrateTxStatus::Dropped => TxStatus::Dropped,
SubstrateTxStatus::Invalid => TxStatus::Invalid,
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TxStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually.
// Only the following statuses are considered "final", in a sense that they end the stream (see the substrate
// docs on `TxStatus`):
//
// - Usurped
// - Finalized
// - FinalityTimeout
// - Invalid
// - Dropped
//
// Even though `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually,
// the server considers them final and closes the connection, when they are encountered.
// In those cases the stream is closed however, so you currently have no way to find
// out if they finally made it into a block or not.
//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
Expand All @@ -175,6 +187,18 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
self.sub = None;
TxStatus::Finalized(TxInBlock::new(hash, self.ext_hash, self.client.clone()))
}
SubstrateTxStatus::Usurped(hash) => {
self.sub = None;
TxStatus::Usurped(hash)
}
SubstrateTxStatus::Dropped => {
self.sub = None;
TxStatus::Dropped
}
SubstrateTxStatus::Invalid => {
self.sub = None;
TxStatus::Invalid
}
}
})
}
Expand Down Expand Up @@ -209,6 +233,12 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
/// there might be cases where transactions alternate between `Future` and `Ready`
/// pool, and are `Broadcast` in the meantime.
///
/// You are free to unsubscribe from notifications at any point.
/// The first one will be emitted when the block in which the transaction was included gets
/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality
/// within 512 blocks. This either indicates that finality is not available for your chain,
/// or that finality gadget is lagging behind.
///
/// Note that there are conditions that may cause transactions to reappear in the pool:
///
/// 1. Due to possible forks, the transaction that ends up being included
Expand All @@ -220,12 +250,17 @@ impl<T: Config, C: OnlineClientT<T>> Stream for TxProgress<T, C> {
/// pool about such cases).
/// 4. `Retracted` transactions might be included in a future block.
///
/// The stream is considered finished only when either the `Finalized` or `FinalityTimeout`
/// event is triggered. You are however free to unsubscribe from notifications at any point.
/// The first one will be emitted when the block in which the transaction was included gets
/// finalized. The `FinalityTimeout` event will be emitted when the block did not reach finality
/// within 512 blocks. This either indicates that finality is not available for your chain,
/// or that finality gadget is lagging behind.
/// Even though these cases can happen, the server-side of the stream is closed, if one of the following is encountered:
/// - Usurped
/// - Finalized
/// - FinalityTimeout
/// - Invalid
/// - Dropped
///
/// In any of these cases the client side TxProgress stream is also closed.
/// In those cases the stream is closed however, so you currently have no way to find
/// out if they finally made it into a block or not.

#[derive(Derivative)]
#[derivative(Debug(bound = "C: std::fmt::Debug"))]
pub enum TxStatus<T: Config, C> {
Expand Down Expand Up @@ -284,7 +319,7 @@ pub struct TxInBlock<T: Config, C> {
client: C,
}

impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
impl<T: Config, C> TxInBlock<T, C> {
pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self {
Self {
block_hash,
Expand All @@ -302,7 +337,9 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
pub fn extrinsic_hash(&self) -> T::Hash {
self.ext_hash
}
}

impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
/// Fetch the events associated with this transaction. If the transaction
/// was successful (ie no `ExtrinsicFailed`) events were found, then we return
/// the events associated with it. If the transaction was not successful, or
Expand Down Expand Up @@ -370,3 +407,115 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
))
}
}

#[cfg(test)]
mod test {
use std::pin::Pin;

use futures::Stream;

use crate::{
client::{OfflineClientT, OnlineClientT},
config::{
extrinsic_params::BaseExtrinsicParams,
polkadot::{PlainTip, PolkadotConfig},
WithExtrinsicParams,
},
error::RpcError,
rpc::{types::SubstrateTxStatus, RpcSubscription, Subscription},
tx::TxProgress,
Config, Error, SubstrateConfig,
};

use serde_json::value::RawValue;

#[derive(Clone, Debug)]
struct MockClient;

impl OfflineClientT<PolkadotConfig> for MockClient {
fn metadata(&self) -> crate::Metadata {
panic!("just a mock impl to satisfy trait bounds")
}

fn genesis_hash(&self) -> <PolkadotConfig as crate::Config>::Hash {
panic!("just a mock impl to satisfy trait bounds")
}

fn runtime_version(&self) -> crate::rpc::types::RuntimeVersion {
panic!("just a mock impl to satisfy trait bounds")
}
}

type MockTxProgress = TxProgress<PolkadotConfig, MockClient>;
type MockHash = <WithExtrinsicParams<
SubstrateConfig,
BaseExtrinsicParams<SubstrateConfig, PlainTip>,
> as Config>::Hash;
type MockSubstrateTxStatus = SubstrateTxStatus<MockHash, MockHash>;

impl OnlineClientT<PolkadotConfig> for MockClient {
fn rpc(&self) -> &crate::rpc::Rpc<PolkadotConfig> {
panic!("just a mock impl to satisfy trait bounds")
}
}

#[tokio::test]
async fn wait_for_finalized_returns_err_when_usurped() {
let tx_progress = mock_tx_progress(vec![
SubstrateTxStatus::Ready,
SubstrateTxStatus::Usurped(Default::default()),
]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Usurped))
));
}

#[tokio::test]
async fn wait_for_finalized_returns_err_when_dropped() {
let tx_progress =
mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Dropped]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Dropped))
));
}

#[tokio::test]
async fn wait_for_finalized_returns_err_when_invalid() {
let tx_progress =
mock_tx_progress(vec![SubstrateTxStatus::Ready, SubstrateTxStatus::Invalid]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(Error::Transaction(crate::error::TransactionError::Invalid))
));
}

fn mock_tx_progress(statuses: Vec<MockSubstrateTxStatus>) -> MockTxProgress {
let sub = create_substrate_tx_status_subscription(statuses);
TxProgress::new(sub, MockClient, Default::default())
}

fn create_substrate_tx_status_subscription(
elements: Vec<MockSubstrateTxStatus>,
) -> Subscription<MockSubstrateTxStatus> {
let rpc_substription_stream: Pin<
Box<dyn Stream<Item = Result<Box<RawValue>, RpcError>> + Send + 'static>,
> = Box::pin(futures::stream::iter(elements.into_iter().map(|e| {
let s = serde_json::to_string(&e).unwrap();
let r: Box<RawValue> = RawValue::from_string(s).unwrap();
Ok(r)
})));

let rpc_subscription: RpcSubscription = RpcSubscription {
stream: rpc_substription_stream,
id: None,
};

let sub: Subscription<MockSubstrateTxStatus> = Subscription::new(rpc_subscription);
sub
}
}