Skip to content

Commit

Permalink
Add next_events_from_metadata and rename next_event (#545)
Browse files Browse the repository at this point in the history
* remove obsolete comment

* make pub things pub
  • Loading branch information
haerdib authored Apr 27, 2023
1 parent 5dae754 commit 6e2c9c2
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 26 deletions.
2 changes: 1 addition & 1 deletion examples/examples/event_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn main() {

// Wait for event callbacks from the node, which are received via subscription.
for _ in 0..5 {
let event_records = subscription.next_event::<RuntimeEvent, Hash>().unwrap().unwrap();
let event_records = subscription.next_events::<RuntimeEvent, Hash>().unwrap().unwrap();
for event_record in &event_records {
println!("decoded: {:?} {:?}", event_record.phase, event_record.event);
match &event_record.event {
Expand Down
3 changes: 0 additions & 3 deletions node-api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use sp_core::H256;

/// A collection of events obtained from a block, bundled with the necessary
/// information needed to decode and iterate over them.
//
// In subxt, this was generic over a `Config` type, but it's sole usage was to derive the
// hash type. We omitted this here and use the `ac_primitives::Hash` instead.
#[derive(Clone, Debug)]
pub struct Events<Hash> {
metadata: Metadata,
Expand Down
61 changes: 40 additions & 21 deletions src/api/rpc_api/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
GetChainInfo, GetStorage,
};
use ac_compose_macros::rpc_params;
use ac_node_api::{EventDetails, EventRecord, Events, Phase};
use ac_node_api::{metadata::Metadata, EventDetails, EventRecord, Events, Phase};
use ac_primitives::{ExtrinsicParams, FrameSystemConfig, StorageChangeSet};
use alloc::{vec, vec::Vec};
use codec::{Decode, Encode};
Expand Down Expand Up @@ -83,22 +83,30 @@ where
/// Simplifies the event retrieval from the subscription.
pub struct EventSubscription<Subscription, Hash> {
pub subscription: Subscription,
pub metadata: Metadata,
_phantom: PhantomData<Hash>,
}

impl<Subscription, Hash> EventSubscription<Subscription, Hash> {
/// Create a new wrapper around the subscription.
pub fn new(subscription: Subscription, metadata: Metadata) -> Self {
Self { subscription, metadata, _phantom: Default::default() }
}

/// Update the metadata.
pub fn update_metadata(&mut self, metadata: Metadata) {
self.metadata = metadata
}
}

impl<Subscription, Hash> EventSubscription<Subscription, Hash>
where
Hash: DeserializeOwned,
Hash: DeserializeOwned + Copy,
Subscription: HandleSubscription<StorageChangeSet<Hash>>,
{
/// Create a new wrapper around the subscription.
pub fn new(subscription: Subscription) -> Self {
Self { subscription, _phantom: Default::default() }
}

/// Wait for the next value from the internal subscription.
/// Upon encounter, it retrieves and decodes the expected `EventRecord`.
pub fn next_event<RuntimeEvent: Decode, Topic: Decode>(
pub fn next_events<RuntimeEvent: Decode, Topic: Decode>(
&mut self,
) -> Option<Result<Vec<EventRecord<RuntimeEvent, Topic>>>> {
let change_set = match self.subscription.next()? {
Expand All @@ -109,8 +117,29 @@ where
// changes in the set. Also, we don't care about the key but only the data, so take
// the second value in the tuple of two.
let storage_data = change_set.changes[0].1.as_ref()?;
let events = Decode::decode(&mut storage_data.0.as_slice()).map_err(Error::Codec);
Some(events)
let event_records = Decode::decode(&mut storage_data.0.as_slice()).map_err(Error::Codec);
Some(event_records)
}

/// Wait for the next value from the internal subscription.
/// Upon encounter, it retrieves and decodes the expected `EventDetails`.
//
// On the contrary to `next_events` this function only needs up-to-date metadata
// and is therefore updateable during runtime.
pub fn next_events_from_metadata(&mut self) -> Option<Result<Events<Hash>>> {
let change_set = match self.subscription.next()? {
Ok(set) => set,
Err(e) => return Some(Err(Error::RpcClient(e))),
};
let block_hash = change_set.block;
// Since we subscribed to only the events key, we can simply take the first value of the
// changes in the set. Also, we don't care about the key but only the data, so take
// the second value in the tuple of two.
let storage_data = change_set.changes[0].1.as_ref()?;
let event_bytes = storage_data.0.clone();

let events = Events::<Hash>::new(self.metadata.clone(), block_hash, event_bytes);
Some(Ok(events))
}

/// Unsubscribe from the internal subscription.
Expand All @@ -119,16 +148,6 @@ where
}
}

impl<Subscription, Hash> From<Subscription> for EventSubscription<Subscription, Hash>
where
Hash: DeserializeOwned,
Subscription: HandleSubscription<StorageChangeSet<Hash>>,
{
fn from(subscription: Subscription) -> Self {
EventSubscription::new(subscription)
}
}

pub trait SubscribeEvents {
type Client: Subscribe;
type Hash: DeserializeOwned;
Expand All @@ -151,7 +170,7 @@ where
let subscription = self
.client()
.subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage")
.map(|sub| sub.into())?;
.map(|sub| EventSubscription::new(sub, self.metadata().clone()))?;
Ok(subscription)
}
}
Expand Down
16 changes: 15 additions & 1 deletion testing/examples/events_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn main() {
// Wait for event callbacks from the node, which are received via subscription.
for _ in 0..5 {
let event_records = event_subscription
.next_event::<RuntimeEvent, <Runtime as FrameSystemConfig>::Hash>()
.next_events::<RuntimeEvent, <Runtime as FrameSystemConfig>::Hash>()
.unwrap()
.unwrap();
for event_record in &event_records {
Expand All @@ -81,6 +81,20 @@ async fn main() {
}
}
}

// Wait for event callbacks from the node, which are received via subscription, in case no RuntimeEvents are accessible.
for _ in 0..5 {
let events = event_subscription.next_events_from_metadata().unwrap().unwrap();
for event in events.iter() {
let event = event.unwrap();
println!("got event: {:?} {:?}", event.pallet_name(), event.variant_name());
if let Ok(Some(_extrinisic_success)) = event.as_event::<ExtrinsicSuccess>() {
println!("Got System event, all good");
} else {
panic!("Unexpected event");
}
}
}
}

fn assert_assosciated_events_match_expected(events: Vec<EventDetails>) {
Expand Down

0 comments on commit 6e2c9c2

Please sign in to comment.