Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return events from blocks skipped over during Finalization, too #473

Merged
merged 15 commits into from
Mar 10, 2022
Merged
4 changes: 2 additions & 2 deletions codegen/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,11 @@ impl RuntimeGenerator {
::subxt::events::at::<T, Event>(self.client, block_hash).await
}

pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> {
pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub<T::Header>, T, Event>, ::subxt::BasicError> {
::subxt::events::subscribe::<T, Event>(self.client).await
}

pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> {
pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError> {
::subxt::events::subscribe_finalized::<T, Event>(self.client).await
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/rpc_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?
.to_runtime_api::<polkadot::RuntimeApi<DefaultConfig, DefaultExtra<DefaultConfig>>>();

let block_number = 1;
let block_number = 1u32;

let block_hash = api
.client
Expand Down
1 change: 0 additions & 1 deletion subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.8.0", features = ["async-client", "client-ws-transport"] }
log = "0.4.14"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.124", features = ["derive"] }
serde_json = "1.0.64"
thiserror = "1.0.24"
Expand Down
3 changes: 2 additions & 1 deletion subxt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ pub trait Config: 'static {
+ Default
+ Copy
+ core::hash::Hash
+ core::str::FromStr;
+ core::str::FromStr
+ Into<u64>;

/// The output of the `Hashing` function.
type Hash: Parameter
Expand Down
153 changes: 128 additions & 25 deletions subxt/src/events/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ use crate::{
use codec::Decode;
use derivative::Derivative;
use futures::{
future::Either,
stream::{
self,
BoxStream,
},
Future,
FutureExt,
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use sp_runtime::traits::Header;
use std::{
marker::Unpin,
task::Poll,
Expand All @@ -51,36 +57,111 @@ pub use super::{
/// [`Events::subscribe_finalized()`] if that is important.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe()` over calling this directly.
/// and is exposed only to be called via the codegen. It may
/// break between minor releases.
#[doc(hidden)]
pub async fn subscribe<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>(
client: &'a Client<T>,
) -> Result<EventSubscription<'a, EventSub<T::Header>, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
}

/// Subscribe to events from finalized blocks.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe_finalized()` over calling this directly.
/// and is exposed only to be called via the codegen. It may
/// break between minor releases.
#[doc(hidden)]
pub async fn subscribe_finalized<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_finalized_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>(
client: &'a Client<T>,
) -> Result<EventSubscription<'a, FinalizedEventSub<'a, T::Header>, T, Evs>, BasicError> {
// fetch the last finalised block details immediately, so that we'll get
// events for each block after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_number = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

// Fill in any gaps between the block above and the finalized blocks reported.
let block_subscription = subscribe_to_block_headers_filling_in_gaps(
client,
last_finalized_block_number,
client.rpc().subscribe_finalized_blocks().await?,
);

Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}

/// Take a subscription that returns block headers, and if any block numbers are missed out
/// betweem the block number provided and what's returned from the subscription, we fill in
/// the gaps and get hold of all intermediate block headers.
///
/// **Note:** This is exposed so that we can run integration tests on it, but otherwise
/// should not be used directly and may break between minor releases.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<'a, S, E, T: Config>(
client: &'a Client<T>,
mut last_block_num: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to assume that the block number is u64 here? Or would Option<Into<u64>> work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since block numbers can always be converted into u64s now, it's just easier to do the conversion first and then pass in the u64's here (this function isn't "public" anyway).

sub: S,
) -> impl Stream<Item = Result<T::Header, BasicError>> + Send + 'a
where
S: Stream<Item = Result<T::Header, E>> + Send + 'a,
E: Into<BasicError> + Send + 'static,
{
sub.flat_map(move |s| {
// Get the header, or return a stream containing just the error. Our EventSubscription
// stream will return `None` as soon as it hits an error like this.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();

// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
async move {
let hash = client.rpc().block_hash(Some(n.into())).await?;
let header = client.rpc().header(hash).await?;
Ok::<_, BasicError>(header)
}
})
.filter_map(|h| async { h.transpose() });

// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result<Header, BasicError>>;

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;

/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct EventSubscription<'a, T: Config, Evs: 'static> {
#[derivative(Debug(bound = "Sub: std::fmt::Debug"))]
pub struct EventSubscription<'a, Sub, T: Config, Evs: 'static> {
finished: bool,
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
block_header_subscription: Sub,
#[derivative(Debug = "ignore")]
at: Option<
std::pin::Pin<
Expand All @@ -90,11 +171,12 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> {
_event_type: std::marker::PhantomData<Evs>,
}

impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
fn new(
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
) -> Self {
impl<'a, Sub, T: Config, Evs: Decode, E: Into<BasicError>>
EventSubscription<'a, Sub, T, Evs>
where
Sub: Stream<Item = Result<T::Header, E>> + Unpin + 'a,
{
fn new(client: &'a Client<T>, block_header_subscription: Sub) -> Self {
EventSubscription {
finished: false,
client,
Expand All @@ -111,7 +193,10 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
}
}

impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin
for EventSubscription<'a, Sub, T, Evs>
{
}

// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose
// way to roughly implement the following function:
Expand All @@ -130,7 +215,13 @@ impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
//
// The advantage of this manual implementation is that we have a named type that we (and others)
// can derive things on, store away, alias etc.
impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
impl<'a, Sub, T, Evs, E> Stream for EventSubscription<'a, Sub, T, Evs>
where
T: Config,
Evs: Decode,
Sub: Stream<Item = Result<T::Header, E>> + Unpin + 'a,
E: Into<BasicError>,
{
type Item = Result<Events<'a, T, Evs>, BasicError>;

fn poll_next(
Expand All @@ -155,7 +246,6 @@ impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
return Poll::Ready(Some(Err(e.into())))
}
Some(Ok(block_header)) => {
use sp_runtime::traits::Header;
// Note [jsdw]: We may be able to get rid of the per-item allocation
// with https://github.com/oblique/reusable-box-future.
self.at = Some(Box::pin(at(self.client, block_header.hash())));
Expand All @@ -181,9 +271,22 @@ mod test {
use super::*;

// Ensure `EventSubscription` can be sent; only actually a compile-time check.
#[test]
#[allow(unused)]
fn check_sendability() {
fn assert_send<T: Send>() {}
assert_send::<EventSubscription<crate::DefaultConfig, ()>>();
assert_send::<
EventSubscription<
EventSub<<crate::DefaultConfig as Config>::Header>,
crate::DefaultConfig,
(),
>,
>();
assert_send::<
EventSubscription<
FinalizedEventSub<<crate::DefaultConfig as Config>::Header>,
crate::DefaultConfig,
(),
>,
>();
}
}
3 changes: 3 additions & 0 deletions subxt/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ pub use decoding::EventsDecodingError;
pub use event_subscription::{
subscribe,
subscribe_finalized,
subscribe_to_block_headers_filling_in_gaps,
EventSub,
EventSubscription,
FinalizedEventSub,
};
pub use events_type::{
at,
Expand Down
43 changes: 16 additions & 27 deletions subxt/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,6 @@ pub enum NumberOrHex {
Hex(U256),
}

/// RPC list or value wrapper.
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(untagged)]
pub enum ListOrValue<T> {
/// A list of values of given type.
List(Vec<T>),
/// A single value of given type.
Value(T),
}

/// Alias for the type of a block returned by `chain_getBlock`
pub type ChainBlock<T> =
SignedBlock<Block<<T as Config>::Header, <T as Config>::Extrinsic>>;
Expand All @@ -120,11 +110,19 @@ impl From<NumberOrHex> for BlockNumber {
}
}

impl From<u32> for BlockNumber {
fn from(x: u32) -> Self {
NumberOrHex::Number(x.into()).into()
// All unsigned ints can be converted into a BlockNumber:
macro_rules! into_block_number {
($($t: ty)+) => {
$(
impl From<$t> for BlockNumber {
fn from(x: $t) -> Self {
NumberOrHex::Number(x.into()).into()
}
}
)+
}
}
into_block_number!(u8 u16 u32 u64);

/// Arbitrary properties defined in the chain spec as a JSON object.
pub type SystemProperties = serde_json::Map<String, serde_json::Value>;
Expand Down Expand Up @@ -285,16 +283,11 @@ impl<T: Config> Rpc<T> {

/// Fetch the genesis hash
pub async fn genesis_hash(&self) -> Result<T::Hash, BasicError> {
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0)));
let block_zero = 0u32;
let params = rpc_params![block_zero];
let list_or_value: ListOrValue<Option<T::Hash>> =
let genesis_hash: Option<T::Hash> =
self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(genesis_hash) => {
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
}
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
}

/// Fetch the metadata
Expand Down Expand Up @@ -346,13 +339,9 @@ impl<T: Config> Rpc<T> {
&self,
block_number: Option<BlockNumber>,
) -> Result<Option<T::Hash>, BasicError> {
let block_number = block_number.map(ListOrValue::Value);
let params = rpc_params![block_number];
let list_or_value = self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(hash) => Ok(hash),
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
let block_hash = self.client.request("chain_getBlockHash", params).await?;
Ok(block_hash)
}

/// Get a block hash of the latest finalized block
Expand Down
4 changes: 2 additions & 2 deletions subxt/tests/integration/codegen/polkadot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27849,13 +27849,13 @@ pub mod api {
}
pub async fn subscribe(
&self,
) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError>
) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub<T::Header>, T, Event>, ::subxt::BasicError>
{
::subxt::events::subscribe::<T, Event>(self.client).await
}
pub async fn subscribe_finalized(
&self,
) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError>
) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError>
{
::subxt::events::subscribe_finalized::<T, Event>(self.client).await
}
Expand Down
Loading