From b5254016ee75097b9f398bf00ffb84c877ab3106 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 8 Mar 2022 12:19:09 +0000 Subject: [PATCH 01/15] make subscription stream generic in EventSubscription --- codegen/src/api/mod.rs | 4 +- subxt/src/events/event_subscription.rs | 52 +++++++++++++++------ subxt/src/events/mod.rs | 2 + subxt/tests/integration/codegen/polkadot.rs | 4 +- 4 files changed, 44 insertions(+), 18 deletions(-) diff --git a/codegen/src/api/mod.rs b/codegen/src/api/mod.rs index c2cd719d3a..2224cf3725 100644 --- a/codegen/src/api/mod.rs +++ b/codegen/src/api/mod.rs @@ -343,11 +343,11 @@ impl RuntimeGenerator { ::subxt::events::at::(self.client, block_hash).await } - pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> { + pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::JsonRpcSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe::(self.client).await } - pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> { + pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::BoxedJsonRpcSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe_finalized::(self.client).await } } diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index afd7fb3220..34630af30f 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -27,7 +27,7 @@ use futures::{ Future, FutureExt, Stream, - StreamExt, + StreamExt, stream::BoxStream, }; use jsonrpsee::core::client::Subscription; use std::{ @@ -56,7 +56,7 @@ pub use super::{ #[doc(hidden)] pub async fn subscribe( client: &'_ Client, -) -> Result, BasicError> { +) -> Result, T, Evs>, BasicError> { let block_subscription = client.rpc().subscribe_blocks().await?; Ok(EventSubscription::new(client, block_subscription)) } @@ -69,18 +69,33 @@ pub async fn subscribe( #[doc(hidden)] pub async fn subscribe_finalized( client: &'_ Client, -) -> Result, BasicError> { - let block_subscription = client.rpc().subscribe_finalized_blocks().await?; - Ok(EventSubscription::new(client, block_subscription)) +) -> Result, T, Evs>, BasicError> { + let block_subscription = client + .rpc() + .subscribe_finalized_blocks() + .await? + .map(|s| s); + Ok(EventSubscription::new(client, Box::pin(block_subscription))) } +/// A jsonrpsee subscription stream that has been `Box::pin`-ned. This is a part of +/// the `EventSubscription` type for `subscribe_finalized`, because it needs to do additional +/// work on top of the basic `Subscription`. +#[doc(hidden)] +pub type BoxedJsonRpcSub
= BoxStream<'static, Result>; + +/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back +/// in codegen, and so is exposed here to be used there. +#[doc(hidden)] +pub type JsonRpcSub = Subscription; + /// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. #[derive(Derivative)] -#[derivative(Debug(bound = ""))] -pub struct EventSubscription<'a, T: Config, Evs: 'static> { +#[derivative(Debug(bound = "Sub: std::fmt::Debug"))] +pub struct EventSubscription<'a, Sub, T: Config, Evs: 'static> { finished: bool, client: &'a Client, - block_header_subscription: Subscription, + block_header_subscription: Sub, #[derivative(Debug = "ignore")] at: Option< std::pin::Pin< @@ -90,10 +105,13 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> { _event_type: std::marker::PhantomData, } -impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { +impl<'a, Sub, T: Config, Evs: Decode> EventSubscription<'a, Sub, T, Evs> +where + Sub: Stream> + Unpin + 'static +{ fn new( client: &'a Client, - block_header_subscription: Subscription, + block_header_subscription: Sub, ) -> Self { EventSubscription { finished: false, @@ -111,7 +129,7 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { } } -impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {} +impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin for EventSubscription<'a, Sub, T, Evs> {} // We want `EventSubscription` to implement Stream. The below implementation is the rather verbose // way to roughly implement the following function: @@ -130,7 +148,12 @@ impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {} // // The advantage of this manual implementation is that we have a named type that we (and others) // can derive things on, store away, alias etc. -impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> { +impl<'a, Sub, T, Evs> Stream for EventSubscription<'a, Sub, T, Evs> +where + T: Config, + Evs: Decode, + Sub: Stream> + Unpin + 'static +{ type Item = Result, BasicError>; fn poll_next( @@ -181,9 +204,10 @@ mod test { use super::*; // Ensure `EventSubscription` can be sent; only actually a compile-time check. - #[test] + #[allow(unused)] fn check_sendability() { fn assert_send() {} - assert_send::>(); + assert_send::::Header>, crate::DefaultConfig, ()>>(); + assert_send::::Header>, crate::DefaultConfig, ()>>(); } } diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index 846019563a..784ca77b8b 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -26,6 +26,8 @@ pub use event_subscription::{ subscribe, subscribe_finalized, EventSubscription, + JsonRpcSub, + BoxedJsonRpcSub, }; pub use events_type::{ at, diff --git a/subxt/tests/integration/codegen/polkadot.rs b/subxt/tests/integration/codegen/polkadot.rs index 79c070decd..2de2ade66c 100644 --- a/subxt/tests/integration/codegen/polkadot.rs +++ b/subxt/tests/integration/codegen/polkadot.rs @@ -27849,13 +27849,13 @@ pub mod api { } pub async fn subscribe( &self, - ) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> + ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::JsonRpcSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe::(self.client).await } pub async fn subscribe_finalized( &self, - ) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> + ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::BoxedJsonRpcSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe_finalized::(self.client).await } From 11368cf0b16fe53acbf1fe5d0eb4a04757a6098b Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 8 Mar 2022 13:10:14 +0000 Subject: [PATCH 02/15] rename to EventSub/FinalizedEventSub --- codegen/src/api/mod.rs | 4 ++-- subxt/src/events/event_subscription.rs | 20 ++++++++++---------- subxt/src/events/mod.rs | 4 ++-- subxt/tests/integration/codegen/polkadot.rs | 4 ++-- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/codegen/src/api/mod.rs b/codegen/src/api/mod.rs index 2224cf3725..2c6f2130d7 100644 --- a/codegen/src/api/mod.rs +++ b/codegen/src/api/mod.rs @@ -343,11 +343,11 @@ impl RuntimeGenerator { ::subxt::events::at::(self.client, block_hash).await } - pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::JsonRpcSub, T, Event>, ::subxt::BasicError> { + pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe::(self.client).await } - pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::BoxedJsonRpcSub, T, Event>, ::subxt::BasicError> { + pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe_finalized::(self.client).await } } diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 34630af30f..bd8c85fe6f 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -56,7 +56,7 @@ pub use super::{ #[doc(hidden)] pub async fn subscribe( client: &'_ Client, -) -> Result, T, Evs>, BasicError> { +) -> Result, T, Evs>, BasicError> { let block_subscription = client.rpc().subscribe_blocks().await?; Ok(EventSubscription::new(client, block_subscription)) } @@ -69,7 +69,7 @@ pub async fn subscribe( #[doc(hidden)] pub async fn subscribe_finalized( client: &'_ Client, -) -> Result, T, Evs>, BasicError> { +) -> Result, T, Evs>, BasicError> { let block_subscription = client .rpc() .subscribe_finalized_blocks() @@ -78,16 +78,16 @@ pub async fn subscribe_finalized( Ok(EventSubscription::new(client, Box::pin(block_subscription))) } -/// A jsonrpsee subscription stream that has been `Box::pin`-ned. This is a part of -/// the `EventSubscription` type for `subscribe_finalized`, because it needs to do additional -/// work on top of the basic `Subscription`. +/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back +/// in codegen from `subscribe_finalized`, and so is exposed here to be used there. +#[doc(hidden)] #[doc(hidden)] -pub type BoxedJsonRpcSub
= BoxStream<'static, Result>; +pub type FinalizedEventSub
= BoxStream<'static, Result>; /// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back -/// in codegen, and so is exposed here to be used there. +/// in codegen from `subscribe`, and so is exposed here to be used there. #[doc(hidden)] -pub type JsonRpcSub = Subscription; +pub type EventSub = Subscription; /// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. #[derive(Derivative)] @@ -207,7 +207,7 @@ mod test { #[allow(unused)] fn check_sendability() { fn assert_send() {} - assert_send::::Header>, crate::DefaultConfig, ()>>(); - assert_send::::Header>, crate::DefaultConfig, ()>>(); + assert_send::::Header>, crate::DefaultConfig, ()>>(); + assert_send::::Header>, crate::DefaultConfig, ()>>(); } } diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index 784ca77b8b..10d61a848d 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -26,8 +26,8 @@ pub use event_subscription::{ subscribe, subscribe_finalized, EventSubscription, - JsonRpcSub, - BoxedJsonRpcSub, + EventSub, + FinalizedEventSub, }; pub use events_type::{ at, diff --git a/subxt/tests/integration/codegen/polkadot.rs b/subxt/tests/integration/codegen/polkadot.rs index 2de2ade66c..26eafbea7e 100644 --- a/subxt/tests/integration/codegen/polkadot.rs +++ b/subxt/tests/integration/codegen/polkadot.rs @@ -27849,13 +27849,13 @@ pub mod api { } pub async fn subscribe( &self, - ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::JsonRpcSub, T, Event>, ::subxt::BasicError> + ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe::(self.client).await } pub async fn subscribe_finalized( &self, - ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::BoxedJsonRpcSub, T, Event>, ::subxt::BasicError> + ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe_finalized::(self.client).await } From 40ad3fcaa2f370d6a943616c5f03fb9384712814 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 8 Mar 2022 14:49:57 +0000 Subject: [PATCH 03/15] wip fix some lifetimes so that event sub can depend on client in stream --- codegen/src/api/mod.rs | 2 +- subxt/src/config.rs | 5 +- subxt/src/events/event_subscription.rs | 87 +++++++++++++++++---- subxt/src/rpc.rs | 17 ++++ subxt/tests/integration/codegen/polkadot.rs | 2 +- 5 files changed, 96 insertions(+), 17 deletions(-) diff --git a/codegen/src/api/mod.rs b/codegen/src/api/mod.rs index 2c6f2130d7..5b706443db 100644 --- a/codegen/src/api/mod.rs +++ b/codegen/src/api/mod.rs @@ -347,7 +347,7 @@ impl RuntimeGenerator { ::subxt::events::subscribe::(self.client).await } - pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub, T, Event>, ::subxt::BasicError> { + pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe_finalized::(self.client).await } } diff --git a/subxt/src/config.rs b/subxt/src/config.rs index dd7b727790..4a1ba4f7bb 100644 --- a/subxt/src/config.rs +++ b/subxt/src/config.rs @@ -46,7 +46,10 @@ pub trait Config: 'static { + Default + Copy + core::hash::Hash - + core::str::FromStr; + + core::str::FromStr + + num_traits::One + + core::ops::Add + + MaybeSerializeDeserialize; /// The output of the `Hashing` function. type Hash: Parameter diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index bd8c85fe6f..ceb9343649 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -27,13 +27,20 @@ use futures::{ Future, FutureExt, Stream, - StreamExt, stream::BoxStream, + StreamExt, + stream::{ + self, + BoxStream, + }, + future::Either, }; use jsonrpsee::core::client::Subscription; use std::{ marker::Unpin, task::Poll, }; +use sp_runtime::traits::Header; +use num_traits::One; pub use super::{ at, @@ -54,9 +61,9 @@ pub use super::{ /// and is exposed only to be called via the codegen. Thus, prefer to use /// `api.events().subscribe()` over calling this directly. #[doc(hidden)] -pub async fn subscribe( - client: &'_ Client, -) -> Result, T, Evs>, BasicError> { +pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>( + client: &'a Client, +) -> Result, T, Evs>, BasicError> { let block_subscription = client.rpc().subscribe_blocks().await?; Ok(EventSubscription::new(client, block_subscription)) } @@ -67,14 +74,66 @@ pub async fn subscribe( /// and is exposed only to be called via the codegen. Thus, prefer to use /// `api.events().subscribe_finalized()` over calling this directly. #[doc(hidden)] -pub async fn subscribe_finalized( - client: &'_ Client, -) -> Result, T, Evs>, BasicError> { +pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( + client: &'a Client, +) -> Result, T, Evs>, BasicError> { + let last_finalized_block_hash = client + .rpc() + .finalized_head() + .await?; + + let mut last_finalized_block_number = client + .rpc() + .header(Some(last_finalized_block_hash)) + .await? + .map(|h| *h.number()); + let block_subscription = client .rpc() .subscribe_finalized_blocks() .await? - .map(|s| s); + .flat_map(move |s| { + // Get the header, or return a stream containing just the error. + let header = match s { + Ok(header) => header, + Err(e) => return Either::Left(stream::once(async { Err(e) })) + }; + + // Figure out the blocks to get headers for; everything from one after the + // last finalized block to the block number we got back from the stream. + let mut curr_block_number = last_finalized_block_number + .unwrap_or(*header.number()); + let end_block_number = *header.number(); + + // Update the last finalized block to the one we're going to return details for. + last_finalized_block_number = Some(end_block_number); + + // Iterate over all of the previous blocks we need headers for: + let prev_block_numbers = std::iter::from_fn(move || { + if curr_block_number == end_block_number { + None + } else { + curr_block_number = curr_block_number + One::one(); + Some(curr_block_number) + } + }); + + stream::iter(prev_block_numbers) + .map(|n| async move { + let hash = client + .rpc() + .block_hash_internal(n) + .await?; + let header = client + .rpc() + .header(hash) + .await?; + Ok::<_,BasicError>(header) + }); + + Either::Right(stream::once(async { Ok(header) })) + }); + Ok(EventSubscription::new(client, Box::pin(block_subscription))) } @@ -82,7 +141,7 @@ pub async fn subscribe_finalized( /// in codegen from `subscribe_finalized`, and so is exposed here to be used there. #[doc(hidden)] #[doc(hidden)] -pub type FinalizedEventSub
= BoxStream<'static, Result>; +pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result>; /// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back /// in codegen from `subscribe`, and so is exposed here to be used there. @@ -105,9 +164,9 @@ pub struct EventSubscription<'a, Sub, T: Config, Evs: 'static> { _event_type: std::marker::PhantomData, } -impl<'a, Sub, T: Config, Evs: Decode> EventSubscription<'a, Sub, T, Evs> +impl<'a, Sub, T: Config, Evs: Decode, E: Into> EventSubscription<'a, Sub, T, Evs> where - Sub: Stream> + Unpin + 'static + Sub: Stream> + Unpin + 'a { fn new( client: &'a Client, @@ -148,11 +207,12 @@ impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin for EventSubscription<'a, Sub // // The advantage of this manual implementation is that we have a named type that we (and others) // can derive things on, store away, alias etc. -impl<'a, Sub, T, Evs> Stream for EventSubscription<'a, Sub, T, Evs> +impl<'a, Sub, T, Evs, E> Stream for EventSubscription<'a, Sub, T, Evs> where T: Config, Evs: Decode, - Sub: Stream> + Unpin + 'static + Sub: Stream> + Unpin + 'a, + E: Into { type Item = Result, BasicError>; @@ -178,7 +238,6 @@ where return Poll::Ready(Some(Err(e.into()))) } Some(Ok(block_header)) => { - use sp_runtime::traits::Header; // Note [jsdw]: We may be able to get rid of the per-item allocation // with https://github.com/oblique/reusable-box-future. self.at = Some(Box::pin(at(self.client, block_header.hash()))); diff --git a/subxt/src/rpc.rs b/subxt/src/rpc.rs index 73e39e5495..6bef808d93 100644 --- a/subxt/src/rpc.rs +++ b/subxt/src/rpc.rs @@ -355,6 +355,23 @@ impl Rpc { } } + /// Get a block hash given a T::BlockNumber from our config. + /// This may not be useful externally, but we need to be able to + /// get detail for T::BlockNumbers. + #[doc(hidden)] + pub async fn block_hash_internal( + &self, + block_number: T::BlockNumber, + ) -> Result, BasicError> { + let block_number = ListOrValue::Value(block_number); + let params = rpc_params![block_number]; + let list_or_value = self.client.request("chain_getBlockHash", params).await?; + match list_or_value { + ListOrValue::Value(hash) => Ok(hash), + ListOrValue::List(_) => Err("Expected a Value, got a List".into()), + } + } + /// Get a block hash of the latest finalized block pub async fn finalized_head(&self) -> Result { let hash = self diff --git a/subxt/tests/integration/codegen/polkadot.rs b/subxt/tests/integration/codegen/polkadot.rs index 26eafbea7e..e158b897e7 100644 --- a/subxt/tests/integration/codegen/polkadot.rs +++ b/subxt/tests/integration/codegen/polkadot.rs @@ -27855,7 +27855,7 @@ pub mod api { } pub async fn subscribe_finalized( &self, - ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub, T, Event>, ::subxt::BasicError> + ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe_finalized::(self.client).await } From 3fb6493a33ebe72046a65c2bdec4d828901a896d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 8 Mar 2022 16:18:54 +0000 Subject: [PATCH 04/15] Cargo fmt + comment tweaks --- subxt/src/events/event_subscription.rs | 143 ++++++++++++++----------- subxt/src/events/mod.rs | 2 +- 2 files changed, 80 insertions(+), 65 deletions(-) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index ceb9343649..7924ee62e6 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -24,23 +24,23 @@ use crate::{ use codec::Decode; use derivative::Derivative; use futures::{ - Future, - FutureExt, - Stream, - StreamExt, + future::Either, stream::{ self, BoxStream, }, - future::Either, + Future, + FutureExt, + Stream, + StreamExt, }; use jsonrpsee::core::client::Subscription; +use num_traits::One; +use sp_runtime::traits::Header; use std::{ marker::Unpin, task::Poll, }; -use sp_runtime::traits::Header; -use num_traits::One; pub use super::{ at, @@ -77,10 +77,9 @@ pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>( pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( client: &'a Client, ) -> Result, T, Evs>, BasicError> { - let last_finalized_block_hash = client - .rpc() - .finalized_head() - .await?; + // fetch the last finalised block details immediately, so that we'll get + // events from this block onwards. + let last_finalized_block_hash = client.rpc().finalized_head().await?; let mut last_finalized_block_number = client .rpc() @@ -88,51 +87,55 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( .await? .map(|h| *h.number()); - let block_subscription = client - .rpc() - .subscribe_finalized_blocks() - .await? - .flat_map(move |s| { - // Get the header, or return a stream containing just the error. - let header = match s { - Ok(header) => header, - Err(e) => return Either::Left(stream::once(async { Err(e) })) - }; - - // Figure out the blocks to get headers for; everything from one after the - // last finalized block to the block number we got back from the stream. - let mut curr_block_number = last_finalized_block_number - .unwrap_or(*header.number()); - let end_block_number = *header.number(); + let block_subscription = + client + .rpc() + .subscribe_finalized_blocks() + .await? + .flat_map(move |s| { + // Get the header, or return a stream containing just the error. Our EventSubscription + // stream will return `None` as soon as it hits an error like this. + let header = match s { + Ok(header) => header, + Err(e) => return Either::Left(stream::once(async { Err(e.into()) })), + }; - // Update the last finalized block to the one we're going to return details for. - last_finalized_block_number = Some(end_block_number); - - // Iterate over all of the previous blocks we need headers for: - let prev_block_numbers = std::iter::from_fn(move || { - if curr_block_number == end_block_number { - None - } else { - curr_block_number = curr_block_number + One::one(); - Some(curr_block_number) - } - }); + // This is the block number that we have returned details for already. + let start_block_number = + last_finalized_block_number.unwrap_or(*header.number()); + // This latest finalized block number that we want to fetch all details up to. + let end_block_number = *header.number(); + // On the next iteration, we'll start from this end block. + last_finalized_block_number = Some(end_block_number); - stream::iter(prev_block_numbers) - .map(|n| async move { - let hash = client - .rpc() - .block_hash_internal(n) - .await?; - let header = client - .rpc() - .header(hash) - .await?; - Ok::<_,BasicError>(header) + // Iterate over all of the previous blocks we need headers for, ignoring the current block + // (which we already have the header info for): + let prev_block_numbers = std::iter::from_fn({ + let mut curr_block_number = start_block_number; + move || { + if curr_block_number == end_block_number { + None + } else { + curr_block_number = curr_block_number + One::one(); + Some(curr_block_number) + } + } }); - Either::Right(stream::once(async { Ok(header) })) - }); + // Produce a stream of all of the previous headers that finalization skipped over. + let old_headers = stream::iter(prev_block_numbers) + .then(move |n| { + async move { + let hash = client.rpc().block_hash_internal(n).await?; + let header = client.rpc().header(hash).await?; + Ok::<_, BasicError>(header) + } + }) + .filter_map(|h| async { h.transpose() }); + + // Return a combination of any previous headers plus the new header. + Either::Right(old_headers.chain(stream::once(async { Ok(header) }))) + }); Ok(EventSubscription::new(client, Box::pin(block_subscription))) } @@ -140,8 +143,7 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( /// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back /// in codegen from `subscribe_finalized`, and so is exposed here to be used there. #[doc(hidden)] -#[doc(hidden)] -pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result>; +pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result>; /// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back /// in codegen from `subscribe`, and so is exposed here to be used there. @@ -164,14 +166,12 @@ pub struct EventSubscription<'a, Sub, T: Config, Evs: 'static> { _event_type: std::marker::PhantomData, } -impl<'a, Sub, T: Config, Evs: Decode, E: Into> EventSubscription<'a, Sub, T, Evs> +impl<'a, Sub, T: Config, Evs: Decode, E: Into> + EventSubscription<'a, Sub, T, Evs> where - Sub: Stream> + Unpin + 'a + Sub: Stream> + Unpin + 'a, { - fn new( - client: &'a Client, - block_header_subscription: Sub, - ) -> Self { + fn new(client: &'a Client, block_header_subscription: Sub) -> Self { EventSubscription { finished: false, client, @@ -188,7 +188,10 @@ where } } -impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin for EventSubscription<'a, Sub, T, Evs> {} +impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin + for EventSubscription<'a, Sub, T, Evs> +{ +} // We want `EventSubscription` to implement Stream. The below implementation is the rather verbose // way to roughly implement the following function: @@ -212,7 +215,7 @@ where T: Config, Evs: Decode, Sub: Stream> + Unpin + 'a, - E: Into + E: Into, { type Item = Result, BasicError>; @@ -266,7 +269,19 @@ mod test { #[allow(unused)] fn check_sendability() { fn assert_send() {} - assert_send::::Header>, crate::DefaultConfig, ()>>(); - assert_send::::Header>, crate::DefaultConfig, ()>>(); + assert_send::< + EventSubscription< + EventSub<::Header>, + crate::DefaultConfig, + (), + >, + >(); + assert_send::< + EventSubscription< + FinalizedEventSub<::Header>, + crate::DefaultConfig, + (), + >, + >(); } } diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index 10d61a848d..03df23be5b 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -25,8 +25,8 @@ pub use decoding::EventsDecodingError; pub use event_subscription::{ subscribe, subscribe_finalized, - EventSubscription, EventSub, + EventSubscription, FinalizedEventSub, }; pub use events_type::{ From 2380c7773c75c5cb32282a2e0d73dd040bc43978 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 8 Mar 2022 16:22:24 +0000 Subject: [PATCH 05/15] Add another comment --- subxt/src/events/event_subscription.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 7924ee62e6..df6187a5df 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -87,6 +87,9 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( .await? .map(|h| *h.number()); + // The complexity here is because the finalized block subscription may skip over + // some blocks, so here we attempt to ensure that we pay attention to every block + // that was skipped over as well as the latest block we were told about. let block_subscription = client .rpc() @@ -141,12 +144,12 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( } /// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back -/// in codegen from `subscribe_finalized`, and so is exposed here to be used there. +/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen. #[doc(hidden)] pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result>; /// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back -/// in codegen from `subscribe`, and so is exposed here to be used there. +/// in codegen from `subscribe`, and is exposed to be used in codegen. #[doc(hidden)] pub type EventSub = Subscription; From 09c69e44cae3c1aa5a7494e266118bc9f67807ae Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 8 Mar 2022 16:52:02 +0000 Subject: [PATCH 06/15] factor out prev block header fetching into a separate function to tidy --- subxt/src/events/event_subscription.rs | 71 ++++++++++++++++---------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index df6187a5df..475fac627f 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -103,46 +103,61 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( Err(e) => return Either::Left(stream::once(async { Err(e.into()) })), }; - // This is the block number that we have returned details for already. - let start_block_number = - last_finalized_block_number.unwrap_or(*header.number()); - // This latest finalized block number that we want to fetch all details up to. + // This is one after the last block we returned details for last time. + let start_block_num = last_finalized_block_number + .map(|n| n + One::one()) + .unwrap_or(*header.number()); + + // We want all previous details up to, but not including this current block num. let end_block_number = *header.number(); - // On the next iteration, we'll start from this end block. - last_finalized_block_number = Some(end_block_number); // Iterate over all of the previous blocks we need headers for, ignoring the current block // (which we already have the header info for): - let prev_block_numbers = std::iter::from_fn({ - let mut curr_block_number = start_block_number; - move || { - if curr_block_number == end_block_number { - None - } else { - curr_block_number = curr_block_number + One::one(); - Some(curr_block_number) - } - } - }); + let previous_headers = + get_block_headers(client, start_block_num, end_block_number); - // Produce a stream of all of the previous headers that finalization skipped over. - let old_headers = stream::iter(prev_block_numbers) - .then(move |n| { - async move { - let hash = client.rpc().block_hash_internal(n).await?; - let header = client.rpc().header(hash).await?; - Ok::<_, BasicError>(header) - } - }) - .filter_map(|h| async { h.transpose() }); + // On the next iteration, we'll get details starting just after this end block. + last_finalized_block_number = Some(end_block_number); // Return a combination of any previous headers plus the new header. - Either::Right(old_headers.chain(stream::once(async { Ok(header) }))) + Either::Right(previous_headers.chain(stream::once(async { Ok(header) }))) }); Ok(EventSubscription::new(client, Box::pin(block_subscription))) } +fn get_block_headers<'a, T: Config>( + client: &'a Client, + mut current_block_num: T::BlockNumber, + end_num: T::BlockNumber, +) -> impl Stream> + Unpin + Send + 'a { + // Iterate over all of the previous blocks we need headers for. We go from (start_num..end_num). + // If start_num == end_num, return nothing. + let block_numbers = std::iter::from_fn(move || { + let res = if current_block_num == end_num { + None + } else { + Some(current_block_num) + }; + + current_block_num = current_block_num + One::one(); + res + }); + + // Produce a stream of all of the previous headers that finalization skipped over. + let block_headers = stream::iter(block_numbers) + .then(move |n| { + async move { + let hash = client.rpc().block_hash_internal(n).await?; + let header = client.rpc().header(hash).await?; + Ok::<_, BasicError>(header) + } + }) + .filter_map(|h| async { h.transpose() }); + + Box::pin(block_headers) +} + /// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back /// in codegen from `subscribe_finalized`, and is exposed to be used in codegen. #[doc(hidden)] From 4ded7556d9442705a3572da13577b5df85ce6833 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 8 Mar 2022 16:54:54 +0000 Subject: [PATCH 07/15] add a comment --- subxt/src/events/event_subscription.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 475fac627f..5e29a4f06a 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -126,6 +126,7 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( Ok(EventSubscription::new(client, Box::pin(block_subscription))) } +/// Return a Stream of all block headers starting from `current_block_num` and ending just before `end_num`. fn get_block_headers<'a, T: Config>( client: &'a Client, mut current_block_num: T::BlockNumber, From e4c18ce030ed6d15db3c07ae841d7feff6e817ee Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 8 Mar 2022 17:47:44 +0000 Subject: [PATCH 08/15] remove ListOrValue as it's unused --- subxt/src/rpc.rs | 37 +++++++------------------------------ 1 file changed, 7 insertions(+), 30 deletions(-) diff --git a/subxt/src/rpc.rs b/subxt/src/rpc.rs index 6bef808d93..b68d797eff 100644 --- a/subxt/src/rpc.rs +++ b/subxt/src/rpc.rs @@ -96,16 +96,6 @@ pub enum NumberOrHex { Hex(U256), } -/// RPC list or value wrapper. -#[derive(Serialize, Deserialize, Debug, PartialEq)] -#[serde(untagged)] -pub enum ListOrValue { - /// A list of values of given type. - List(Vec), - /// A single value of given type. - Value(T), -} - /// Alias for the type of a block returned by `chain_getBlock` pub type ChainBlock = SignedBlock::Header, ::Extrinsic>>; @@ -285,16 +275,11 @@ impl Rpc { /// Fetch the genesis hash pub async fn genesis_hash(&self) -> Result { - let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0))); + let block_zero = 0u32; let params = rpc_params![block_zero]; - let list_or_value: ListOrValue> = + let genesis_hash: Option = self.client.request("chain_getBlockHash", params).await?; - match list_or_value { - ListOrValue::Value(genesis_hash) => { - genesis_hash.ok_or_else(|| "Genesis hash not found".into()) - } - ListOrValue::List(_) => Err("Expected a Value, got a List".into()), - } + genesis_hash.ok_or_else(|| "Genesis hash not found".into()) } /// Fetch the metadata @@ -346,13 +331,9 @@ impl Rpc { &self, block_number: Option, ) -> Result, BasicError> { - let block_number = block_number.map(ListOrValue::Value); let params = rpc_params![block_number]; - let list_or_value = self.client.request("chain_getBlockHash", params).await?; - match list_or_value { - ListOrValue::Value(hash) => Ok(hash), - ListOrValue::List(_) => Err("Expected a Value, got a List".into()), - } + let block_hash = self.client.request("chain_getBlockHash", params).await?; + Ok(block_hash) } /// Get a block hash given a T::BlockNumber from our config. @@ -363,13 +344,9 @@ impl Rpc { &self, block_number: T::BlockNumber, ) -> Result, BasicError> { - let block_number = ListOrValue::Value(block_number); let params = rpc_params![block_number]; - let list_or_value = self.client.request("chain_getBlockHash", params).await?; - match list_or_value { - ListOrValue::Value(hash) => Ok(hash), - ListOrValue::List(_) => Err("Expected a Value, got a List".into()), - } + let block_hash = self.client.request("chain_getBlockHash", params).await?; + Ok(block_hash) } /// Get a block hash of the latest finalized block From 3f728cd1344594d0fc4ebf5eff3db4f9a453d903 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 9 Mar 2022 11:16:30 +0000 Subject: [PATCH 09/15] Into on BlockNumber to simplify things --- subxt/Cargo.toml | 1 - subxt/src/config.rs | 4 +--- subxt/src/events/event_subscription.rs | 21 +++++++++---------- subxt/src/rpc.rs | 29 +++++++++++--------------- 4 files changed, 23 insertions(+), 32 deletions(-) diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index eb92b76401..13d3801c74 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -22,7 +22,6 @@ futures = "0.3.13" hex = "0.4.3" jsonrpsee = { version = "0.8.0", features = ["async-client", "client-ws-transport"] } log = "0.4.14" -num-traits = { version = "0.2.14", default-features = false } serde = { version = "1.0.124", features = ["derive"] } serde_json = "1.0.64" thiserror = "1.0.24" diff --git a/subxt/src/config.rs b/subxt/src/config.rs index 4a1ba4f7bb..811ca01734 100644 --- a/subxt/src/config.rs +++ b/subxt/src/config.rs @@ -47,9 +47,7 @@ pub trait Config: 'static { + Copy + core::hash::Hash + core::str::FromStr - + num_traits::One - + core::ops::Add - + MaybeSerializeDeserialize; + + Into; /// The output of the `Hashing` function. type Hash: Parameter diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 5e29a4f06a..6df94d4cd4 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -35,7 +35,6 @@ use futures::{ StreamExt, }; use jsonrpsee::core::client::Subscription; -use num_traits::One; use sp_runtime::traits::Header; use std::{ marker::Unpin, @@ -85,7 +84,7 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( .rpc() .header(Some(last_finalized_block_hash)) .await? - .map(|h| *h.number()); + .map(|h| (*h.number()).into()); // The complexity here is because the finalized block subscription may skip over // some blocks, so here we attempt to ensure that we pay attention to every block @@ -103,13 +102,13 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( Err(e) => return Either::Left(stream::once(async { Err(e.into()) })), }; + // We want all previous details up to, but not including this current block num. + let end_block_number = (*header.number()).into(); + // This is one after the last block we returned details for last time. let start_block_num = last_finalized_block_number - .map(|n| n + One::one()) - .unwrap_or(*header.number()); - - // We want all previous details up to, but not including this current block num. - let end_block_number = *header.number(); + .map(|n| n + 1) + .unwrap_or(end_block_number); // Iterate over all of the previous blocks we need headers for, ignoring the current block // (which we already have the header info for): @@ -129,8 +128,8 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( /// Return a Stream of all block headers starting from `current_block_num` and ending just before `end_num`. fn get_block_headers<'a, T: Config>( client: &'a Client, - mut current_block_num: T::BlockNumber, - end_num: T::BlockNumber, + mut current_block_num: u128, + end_num: u128, ) -> impl Stream> + Unpin + Send + 'a { // Iterate over all of the previous blocks we need headers for. We go from (start_num..end_num). // If start_num == end_num, return nothing. @@ -141,7 +140,7 @@ fn get_block_headers<'a, T: Config>( Some(current_block_num) }; - current_block_num = current_block_num + One::one(); + current_block_num = current_block_num + 1; res }); @@ -149,7 +148,7 @@ fn get_block_headers<'a, T: Config>( let block_headers = stream::iter(block_numbers) .then(move |n| { async move { - let hash = client.rpc().block_hash_internal(n).await?; + let hash = client.rpc().block_hash(Some(n.into())).await?; let header = client.rpc().header(hash).await?; Ok::<_, BasicError>(header) } diff --git a/subxt/src/rpc.rs b/subxt/src/rpc.rs index b68d797eff..01f8f0ea72 100644 --- a/subxt/src/rpc.rs +++ b/subxt/src/rpc.rs @@ -91,7 +91,7 @@ use sp_runtime::generic::{ #[serde(untagged)] pub enum NumberOrHex { /// The number represented directly. - Number(u64), + Number(u128), /// Hex representation of the number. Hex(U256), } @@ -110,11 +110,19 @@ impl From for BlockNumber { } } -impl From for BlockNumber { - fn from(x: u32) -> Self { - NumberOrHex::Number(x.into()).into() +// All unsigned ints can be converted into a BlockNumber: +macro_rules! into_block_number { + ($($t: ty)+) => { + $( + impl From<$t> for BlockNumber { + fn from(x: $t) -> Self { + NumberOrHex::Number(x.into()).into() + } + } + )+ } } +into_block_number!(u8 u16 u32 u64 u128); /// Arbitrary properties defined in the chain spec as a JSON object. pub type SystemProperties = serde_json::Map; @@ -336,19 +344,6 @@ impl Rpc { Ok(block_hash) } - /// Get a block hash given a T::BlockNumber from our config. - /// This may not be useful externally, but we need to be able to - /// get detail for T::BlockNumbers. - #[doc(hidden)] - pub async fn block_hash_internal( - &self, - block_number: T::BlockNumber, - ) -> Result, BasicError> { - let params = rpc_params![block_number]; - let block_hash = self.client.request("chain_getBlockHash", params).await?; - Ok(block_hash) - } - /// Get a block hash of the latest finalized block pub async fn finalized_head(&self) -> Result { let hash = self From 3ee62ec5754ca283ddfd07cc22509f3dbc33aa7e Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 9 Mar 2022 11:18:59 +0000 Subject: [PATCH 10/15] clippy --- subxt/src/events/event_subscription.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 6df94d4cd4..8316a42c58 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -126,11 +126,11 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( } /// Return a Stream of all block headers starting from `current_block_num` and ending just before `end_num`. -fn get_block_headers<'a, T: Config>( - client: &'a Client, +fn get_block_headers( + client: &'_ Client, mut current_block_num: u128, end_num: u128, -) -> impl Stream> + Unpin + Send + 'a { +) -> impl Stream> + Unpin + Send + '_ { // Iterate over all of the previous blocks we need headers for. We go from (start_num..end_num). // If start_num == end_num, return nothing. let block_numbers = std::iter::from_fn(move || { @@ -140,7 +140,7 @@ fn get_block_headers<'a, T: Config>( Some(current_block_num) }; - current_block_num = current_block_num + 1; + current_block_num += 1; res }); From 4d8cde5c4bb4718795c0b5422fc05aeb127d2338 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 9 Mar 2022 12:18:54 +0000 Subject: [PATCH 11/15] Fix an example and clippy --- examples/examples/rpc_call.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/examples/rpc_call.rs b/examples/examples/rpc_call.rs index 2763ba97ba..71eafd1f34 100644 --- a/examples/examples/rpc_call.rs +++ b/examples/examples/rpc_call.rs @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box> { .await? .to_runtime_api::>>(); - let block_number = 1; + let block_number = 1u32; let block_hash = api .client From 8504b10a3baf3c57fd087c70c44aeceabf255fe7 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 9 Mar 2022 13:41:35 +0000 Subject: [PATCH 12/15] simplify iterator now we are Into --- subxt/src/events/event_subscription.rs | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 8316a42c58..5add216510 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -128,24 +128,10 @@ pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( /// Return a Stream of all block headers starting from `current_block_num` and ending just before `end_num`. fn get_block_headers( client: &'_ Client, - mut current_block_num: u128, + start_num: u128, end_num: u128, ) -> impl Stream> + Unpin + Send + '_ { - // Iterate over all of the previous blocks we need headers for. We go from (start_num..end_num). - // If start_num == end_num, return nothing. - let block_numbers = std::iter::from_fn(move || { - let res = if current_block_num == end_num { - None - } else { - Some(current_block_num) - }; - - current_block_num += 1; - res - }); - - // Produce a stream of all of the previous headers that finalization skipped over. - let block_headers = stream::iter(block_numbers) + let block_headers = stream::iter(start_num..end_num) .then(move |n| { async move { let hash = client.rpc().block_hash(Some(n.into())).await?; From 5fbc66df3dc4b95135c1ecf49a735ec11d6a67ca Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 9 Mar 2022 14:19:21 +0000 Subject: [PATCH 13/15] Into instead because it needs serializing, and test core logic --- subxt/src/config.rs | 2 +- subxt/src/events/event_subscription.rs | 113 +++++++++++++------------ subxt/src/events/mod.rs | 1 + subxt/src/rpc.rs | 4 +- subxt/tests/integration/events.rs | 51 +++++++++++ 5 files changed, 112 insertions(+), 59 deletions(-) diff --git a/subxt/src/config.rs b/subxt/src/config.rs index 811ca01734..09c363c2ac 100644 --- a/subxt/src/config.rs +++ b/subxt/src/config.rs @@ -47,7 +47,7 @@ pub trait Config: 'static { + Copy + core::hash::Hash + core::str::FromStr - + Into; + + Into; /// The output of the `Hashing` function. type Hash: Parameter diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 5add216510..a19d3f6867 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -57,8 +57,8 @@ pub use super::{ /// [`Events::subscribe_finalized()`] if that is important. /// /// **Note:** This function is hidden from the documentation -/// and is exposed only to be called via the codegen. Thus, prefer to use -/// `api.events().subscribe()` over calling this directly. +/// and is exposed only to be called via the codegen. It may +/// break between minor releases. #[doc(hidden)] pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>( client: &'a Client, @@ -70,78 +70,79 @@ pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>( /// Subscribe to events from finalized blocks. /// /// **Note:** This function is hidden from the documentation -/// and is exposed only to be called via the codegen. Thus, prefer to use -/// `api.events().subscribe_finalized()` over calling this directly. +/// and is exposed only to be called via the codegen. It may +/// break between minor releases. #[doc(hidden)] pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( client: &'a Client, ) -> Result, T, Evs>, BasicError> { // fetch the last finalised block details immediately, so that we'll get - // events from this block onwards. + // events for each block after this one. let last_finalized_block_hash = client.rpc().finalized_head().await?; - - let mut last_finalized_block_number = client + let last_finalized_block_number = client .rpc() .header(Some(last_finalized_block_hash)) .await? .map(|h| (*h.number()).into()); - // The complexity here is because the finalized block subscription may skip over - // some blocks, so here we attempt to ensure that we pay attention to every block - // that was skipped over as well as the latest block we were told about. - let block_subscription = - client - .rpc() - .subscribe_finalized_blocks() - .await? - .flat_map(move |s| { - // Get the header, or return a stream containing just the error. Our EventSubscription - // stream will return `None` as soon as it hits an error like this. - let header = match s { - Ok(header) => header, - Err(e) => return Either::Left(stream::once(async { Err(e.into()) })), - }; - - // We want all previous details up to, but not including this current block num. - let end_block_number = (*header.number()).into(); + // Fill in any gaps between the block above and the finalized blocks reported. + let block_subscription = subscribe_to_block_headers_filling_in_gaps( + client, + last_finalized_block_number, + client.rpc().subscribe_finalized_blocks().await?, + ); - // This is one after the last block we returned details for last time. - let start_block_num = last_finalized_block_number - .map(|n| n + 1) - .unwrap_or(end_block_number); + Ok(EventSubscription::new(client, Box::pin(block_subscription))) +} - // Iterate over all of the previous blocks we need headers for, ignoring the current block - // (which we already have the header info for): - let previous_headers = - get_block_headers(client, start_block_num, end_block_number); +/// Take a subscription that returns block headers, and if any block numbers are missed out +/// betweem the block number provided and what's returned from the subscription, we fill in +/// the gaps and get hold of all intermediate block headers. +/// +/// **Note:** This is exposed so that we can run integration tests on it, but otherwise +/// should not be used directly and may break between minor releases. +#[doc(hidden)] +pub fn subscribe_to_block_headers_filling_in_gaps<'a, S, E, T: Config>( + client: &'a Client, + mut last_block_num: Option, + sub: S, +) -> impl Stream> + Send + 'a +where + S: Stream> + Send + 'a, + E: Into + Send + 'static, +{ + sub.flat_map(move |s| { + // Get the header, or return a stream containing just the error. Our EventSubscription + // stream will return `None` as soon as it hits an error like this. + let header = match s { + Ok(header) => header, + Err(e) => return Either::Left(stream::once(async { Err(e.into()) })), + }; - // On the next iteration, we'll get details starting just after this end block. - last_finalized_block_number = Some(end_block_number); + // We want all previous details up to, but not including this current block num. + let end_block_num = (*header.number()).into(); - // Return a combination of any previous headers plus the new header. - Either::Right(previous_headers.chain(stream::once(async { Ok(header) }))) - }); + // This is one after the last block we returned details for last time. + let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num); - Ok(EventSubscription::new(client, Box::pin(block_subscription))) -} + // Iterate over all of the previous blocks we need headers for, ignoring the current block + // (which we already have the header info for): + let previous_headers = stream::iter(start_block_num..end_block_num) + .then(move |n| { + async move { + let hash = client.rpc().block_hash(Some(n.into())).await?; + let header = client.rpc().header(hash).await?; + Ok::<_, BasicError>(header) + } + }) + .filter_map(|h| async { h.transpose() }); -/// Return a Stream of all block headers starting from `current_block_num` and ending just before `end_num`. -fn get_block_headers( - client: &'_ Client, - start_num: u128, - end_num: u128, -) -> impl Stream> + Unpin + Send + '_ { - let block_headers = stream::iter(start_num..end_num) - .then(move |n| { - async move { - let hash = client.rpc().block_hash(Some(n.into())).await?; - let header = client.rpc().header(hash).await?; - Ok::<_, BasicError>(header) - } - }) - .filter_map(|h| async { h.transpose() }); + // On the next iteration, we'll get details starting just after this end block. + last_block_num = Some(end_block_num); - Box::pin(block_headers) + // Return a combination of any previous headers plus the new header. + Either::Right(previous_headers.chain(stream::once(async { Ok(header) }))) + }) } /// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index 03df23be5b..a6ed76b568 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -25,6 +25,7 @@ pub use decoding::EventsDecodingError; pub use event_subscription::{ subscribe, subscribe_finalized, + subscribe_to_block_headers_filling_in_gaps, EventSub, EventSubscription, FinalizedEventSub, diff --git a/subxt/src/rpc.rs b/subxt/src/rpc.rs index 01f8f0ea72..5f062aca4e 100644 --- a/subxt/src/rpc.rs +++ b/subxt/src/rpc.rs @@ -91,7 +91,7 @@ use sp_runtime::generic::{ #[serde(untagged)] pub enum NumberOrHex { /// The number represented directly. - Number(u128), + Number(u64), /// Hex representation of the number. Hex(U256), } @@ -122,7 +122,7 @@ macro_rules! into_block_number { )+ } } -into_block_number!(u8 u16 u32 u64 u128); +into_block_number!(u8 u16 u32 u64); /// Arbitrary properties defined in the chain spec as a JSON object. pub type SystemProperties = serde_json::Map; diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index fbeb7cb17e..33debe23b4 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -136,6 +136,57 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { Ok(()) } +#[async_std::test] +async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::BasicError> { + // This function is not publically available to use, but contains + // the key logic for filling in missing blocks, so we want to test it. + // This is used in `subscribe_finalized` to ensure no block headers are + // missed. + use subxt::events::subscribe_to_block_headers_filling_in_gaps; + + let ctx = test_context().await; + + // Manually subscribe to the next 4 finalized block headers, but deliberately + // filter out two in the middle. + let some_finalized_blocks = ctx + .api + .client + .rpc() + .subscribe_finalized_blocks() + .await? + .enumerate() + .take(4) + .filter(|(n, _)| { + let n = *n; + async move { n == 0 || n == 3 } + }) + .map(|(_, h)| h); + + // This should spot the gap in the middle and fill it back in. + let all_finalized_blocks = subscribe_to_block_headers_filling_in_gaps( + &ctx.api.client, + None, + some_finalized_blocks, + ); + futures::pin_mut!(all_finalized_blocks); + + // Iterate the block headers, making sure we get them all in order. + let mut last_block_number = None; + while let Some(header) = all_finalized_blocks.next().await { + let header = header?; + + use sp_runtime::traits::Header; + let block_number: u128 = (*header.number()).into(); + + if let Some(last) = last_block_number { + assert_eq!(last + 1, block_number); + } + last_block_number = Some(block_number); + } + + Ok(()) +} + // This is just a compile-time check that we can subscribe to events in // a context that requires the event subscription/filtering to be Send-able. // We test a typical use of EventSubscription and FilterEvents. We don't need From a6acd6b23f3c78325c6897af504dcfa4918241a0 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 9 Mar 2022 15:48:53 +0000 Subject: [PATCH 14/15] Tweak missing block test to fill in >=2 holes --- subxt/tests/integration/events.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index 33debe23b4..464f68edb7 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -146,8 +146,9 @@ async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::BasicErr let ctx = test_context().await; - // Manually subscribe to the next 4 finalized block headers, but deliberately - // filter out two in the middle. + // Manually subscribe to the next 6 finalized block headers, but deliberately + // filter out some in the middle so we get back b _ _ b _ b. This guarantees + // that there will be some gaps, even if there aren't any from the subscription. let some_finalized_blocks = ctx .api .client @@ -155,10 +156,10 @@ async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::BasicErr .subscribe_finalized_blocks() .await? .enumerate() - .take(4) + .take(6) .filter(|(n, _)| { let n = *n; - async move { n == 0 || n == 3 } + async move { n == 0 || n == 3 || n == 5 } }) .map(|(_, h)| h); From 124247164c9d4390523f42b4277a20bea0a6bef5 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 9 Mar 2022 15:59:51 +0000 Subject: [PATCH 15/15] tweak a comment --- subxt/tests/integration/events.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index 464f68edb7..44f00845e6 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -163,7 +163,7 @@ async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::BasicErr }) .map(|(_, h)| h); - // This should spot the gap in the middle and fill it back in. + // This should spot any gaps in the middle and fill them back in. let all_finalized_blocks = subscribe_to_block_headers_filling_in_gaps( &ctx.api.client, None,