From a9630551c2cd877952ab769c862af4c81b0ccd3c Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 25 Jan 2022 18:27:54 +0100 Subject: [PATCH] Unify RelayChainInterface error handling and introduce async (#909) --- Cargo.lock | 3 + client/consensus/common/Cargo.toml | 2 +- .../common/src/parachain_consensus.rs | 84 +++++--- client/consensus/common/src/tests.rs | 19 +- client/consensus/relay-chain/src/lib.rs | 2 +- client/network/src/lib.rs | 134 ++++++------ client/network/src/tests.rs | 129 +++++++----- client/pov-recovery/src/lib.rs | 70 +++--- client/relay-chain-interface/Cargo.toml | 3 + client/relay-chain-interface/src/lib.rs | 184 ++++++++-------- client/relay-chain-local/src/lib.rs | 199 ++++++++---------- client/service/src/lib.rs | 22 +- pallets/parachain-system/src/lib.rs | 2 +- parachain-template/node/src/service.rs | 7 +- polkadot-parachains/src/service.rs | 26 ++- .../parachain-inherent/src/client_side.rs | 49 ++++- test/service/src/lib.rs | 11 +- 17 files changed, 532 insertions(+), 414 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01a0ad13299..eb4ca303306 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1895,14 +1895,17 @@ dependencies = [ "async-trait", "cumulus-primitives-core", "derive_more", + "futures 0.3.19", "parking_lot 0.11.2", "polkadot-overseer", "sc-client-api", + "sc-service", "sp-api", "sp-blockchain", "sp-core", "sp-runtime", "sp-state-machine", + "thiserror", ] [[package]] diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index 057f0b34966..379f73740f2 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -25,7 +25,7 @@ cumulus-relay-chain-interface = { path = "../../relay-chain-interface" } futures = { version = "0.3.8", features = ["compat"] } codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] } tracing = "0.1.25" -async-trait = "0.1.42" +async-trait = "0.1.52" dyn-clone = "1.0.4" [dev-dependencies] diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 224e3e5fd9b..6328681fd9f 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -14,12 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use cumulus_relay_chain_interface::RelayChainInterface; +use async_trait::async_trait; +use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use sc_client_api::{ Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, }; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; -use sp_blockchain::{Error as ClientError, Result as ClientResult}; +use sp_blockchain::Error as ClientError; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::{ generic::BlockId, @@ -29,11 +30,14 @@ use sp_runtime::{ use polkadot_primitives::v1::{Block as PBlock, Id as ParaId, OccupiedCoreAssumption}; use codec::Decode; -use futures::{future, select, FutureExt, Stream, StreamExt}; +use futures::{select, FutureExt, Stream, StreamExt}; use std::{pin::Pin, sync::Arc}; +const LOG_TARGET: &str = "cumulus-consensus"; + /// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`. +#[async_trait] pub trait RelaychainClient: Clone + 'static { /// The error type for interacting with the Polkadot client. type Error: std::fmt::Debug + Send; @@ -42,17 +46,17 @@ pub trait RelaychainClient: Clone + 'static { type HeadStream: Stream> + Send + Unpin; /// Get a stream of new best heads for the given parachain. - fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream; + async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult; /// Get a stream of finalized heads for the given parachain. - fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream; + async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult; /// Returns the parachain head for the given `para_id` at the given block id. - fn parachain_head_at( + async fn parachain_head_at( &self, at: &BlockId, para_id: ParaId, - ) -> ClientResult>>; + ) -> RelayChainResult>>; } /// Follow the finalized head of the given parachain. @@ -66,7 +70,13 @@ where R: RelaychainClient, B: Backend, { - let mut finalized_heads = relay_chain.finalized_heads(para_id); + let mut finalized_heads = match relay_chain.finalized_heads(para_id).await { + Ok(finalized_heads_stream) => finalized_heads_stream, + Err(err) => { + tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); + return + }, + }; loop { let finalized_head = if let Some(h) = finalized_heads.next().await { @@ -165,7 +175,14 @@ async fn follow_new_best( R: RelaychainClient, B: Backend, { - let mut new_best_heads = relay_chain.new_best_heads(para_id).fuse(); + let mut new_best_heads = match relay_chain.new_best_heads(para_id).await { + Ok(best_heads_stream) => best_heads_stream.fuse(), + Err(err) => { + tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream."); + return + }, + }; + let mut imported_blocks = parachain.import_notification_stream().fuse(); // The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain // block before the parachain block it included. In this case we need to wait for this block to @@ -368,6 +385,7 @@ where } } +#[async_trait] impl RelaychainClient for RCInterface where RCInterface: RelayChainInterface + Clone + 'static, @@ -376,39 +394,53 @@ where type HeadStream = Pin> + Send>>; - fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream { + async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult { let relay_chain = self.clone(); - self.import_notification_stream() + let new_best_notification_stream = self + .new_best_notification_stream() + .await? .filter_map(move |n| { - future::ready(if n.is_new_best { - relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten() - } else { - None - }) + let relay_chain = relay_chain.clone(); + async move { + relay_chain + .parachain_head_at(&BlockId::hash(n.hash()), para_id) + .await + .ok() + .flatten() + } }) - .boxed() + .boxed(); + Ok(new_best_notification_stream) } - fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream { + async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult { let relay_chain = self.clone(); - self.finality_notification_stream() + let finality_notification_stream = self + .finality_notification_stream() + .await? .filter_map(move |n| { - future::ready( - relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten(), - ) + let relay_chain = relay_chain.clone(); + async move { + relay_chain + .parachain_head_at(&BlockId::hash(n.hash()), para_id) + .await + .ok() + .flatten() + } }) - .boxed() + .boxed(); + Ok(finality_notification_stream) } - fn parachain_head_at( + async fn parachain_head_at( &self, at: &BlockId, para_id: ParaId, - ) -> ClientResult>> { + ) -> RelayChainResult>> { self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut) + .await .map(|s| s.map(|s| s.parent_head.0)) - .map_err(Into::into) } } diff --git a/client/consensus/common/src/tests.rs b/client/consensus/common/src/tests.rs index 4340b7b681e..ceb60aa501e 100644 --- a/client/consensus/common/src/tests.rs +++ b/client/consensus/common/src/tests.rs @@ -16,7 +16,9 @@ use crate::*; +use async_trait::async_trait; use codec::Encode; +use cumulus_relay_chain_interface::RelayChainResult; use cumulus_test_client::{ runtime::{Block, Header}, Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt, @@ -26,7 +28,7 @@ use futures_timer::Delay; use polkadot_primitives::v1::{Block as PBlock, Id as ParaId}; use sc_client_api::UsageProvider; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; -use sp_blockchain::{Error as ClientError, Result as ClientResult}; +use sp_blockchain::Error as ClientError; use sp_consensus::BlockOrigin; use sp_runtime::generic::BlockId; use std::{ @@ -66,12 +68,13 @@ impl Relaychain { } } +#[async_trait] impl crate::parachain_consensus::RelaychainClient for Relaychain { type Error = ClientError; type HeadStream = Box> + Send + Unpin>; - fn new_best_heads(&self, _: ParaId) -> Self::HeadStream { + async fn new_best_heads(&self, _: ParaId) -> RelayChainResult { let stream = self .inner .lock() @@ -80,10 +83,10 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain { .take() .expect("Should only be called once"); - Box::new(stream.map(|v| v.encode())) + Ok(Box::new(stream.map(|v| v.encode()))) } - fn finalized_heads(&self, _: ParaId) -> Self::HeadStream { + async fn finalized_heads(&self, _: ParaId) -> RelayChainResult { let stream = self .inner .lock() @@ -92,10 +95,14 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain { .take() .expect("Should only be called once"); - Box::new(stream.map(|v| v.encode())) + Ok(Box::new(stream.map(|v| v.encode()))) } - fn parachain_head_at(&self, _: &BlockId, _: ParaId) -> ClientResult>> { + async fn parachain_head_at( + &self, + _: &BlockId, + _: ParaId, + ) -> RelayChainResult>> { unimplemented!("Not required for tests") } } diff --git a/client/consensus/relay-chain/src/lib.rs b/client/consensus/relay-chain/src/lib.rs index 7ab3ef28619..69a92175da1 100644 --- a/client/consensus/relay-chain/src/lib.rs +++ b/client/consensus/relay-chain/src/lib.rs @@ -176,7 +176,7 @@ where .propose( inherent_data, Default::default(), - //TODO: Fix this. + // TODO: Fix this. Duration::from_millis(500), // Set the block limit to 50% of the maximum PoV size. // diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 79e4f7c1b79..2010803d384 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -38,11 +38,7 @@ use polkadot_primitives::v1::{ }; use codec::{Decode, DecodeAll, Encode}; -use futures::{ - channel::oneshot, - future::{ready, FutureExt}, - Future, -}; +use futures::{channel::oneshot, future::FutureExt, Future}; use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc}; @@ -128,7 +124,7 @@ impl BlockAnnounceData { /// Check the signature of the statement. /// /// Returns an `Err(_)` if it failed. - fn check_signature( + async fn check_signature( self, relay_chain_client: &RCInterface, ) -> Result @@ -138,16 +134,16 @@ impl BlockAnnounceData { let validator_index = self.statement.unchecked_validator_index(); let runtime_api_block_id = BlockId::Hash(self.relay_parent); - let session_index = match relay_chain_client.session_index_for_child(&runtime_api_block_id) - { - Ok(r) => r, - Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))), - }; + let session_index = + match relay_chain_client.session_index_for_child(&runtime_api_block_id).await { + Ok(r) => r, + Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))), + }; let signing_context = SigningContext { parent_hash: self.relay_parent, session_index }; // Check that the signer is a legit validator. - let authorities = match relay_chain_client.validators(&runtime_api_block_id) { + let authorities = match relay_chain_client.validators(&runtime_api_block_id).await { Ok(r) => r, Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))), }; @@ -222,6 +218,7 @@ impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData { /// chain. If it is at the tip, it is required to provide a justification or otherwise we reject /// it. However, if the announcement is for a block below the tip the announcement is accepted /// as it probably comes from a node that is currently syncing the chain. +#[derive(Clone)] pub struct BlockAnnounceValidator { phantom: PhantomData, relay_chain_interface: RCInterface, @@ -247,13 +244,14 @@ where RCInterface: RelayChainInterface + Clone, { /// Get the included block of the given parachain in the relay chain. - fn included_block( + async fn included_block( relay_chain_interface: &RCInterface, block_id: &BlockId, para_id: ParaId, ) -> Result { let validation_data = relay_chain_interface .persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut) + .await .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)? .ok_or_else(|| { Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into())) @@ -269,56 +267,59 @@ where } /// Get the backed block hash of the given parachain in the relay chain. - fn backed_block_hash( + async fn backed_block_hash( relay_chain_interface: &RCInterface, block_id: &BlockId, para_id: ParaId, ) -> Result, BoxedError> { let candidate_receipt = relay_chain_interface .candidate_pending_availability(block_id, para_id) + .await .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?; Ok(candidate_receipt.map(|cr| cr.descriptor.para_head)) } /// Handle a block announcement with empty data (no statement) attached to it. - fn handle_empty_block_announce_data( + async fn handle_empty_block_announce_data( &self, header: Block::Header, - ) -> impl Future> { + ) -> Result { let relay_chain_interface = self.relay_chain_interface.clone(); let para_id = self.para_id; - async move { - // Check if block is equal or higher than best (this requires a justification) - let relay_chain_best_hash = relay_chain_interface.best_block_hash(); - let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash); - let block_number = header.number(); - - let best_head = - Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id)?; - let known_best_number = best_head.number(); - let backed_block = - || Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id); - - if best_head == header { - tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",); - - Ok(Validation::Success { is_new_best: true }) - } else if Some(HeadData(header.encode()).hash()) == backed_block()? { - tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",); - - Ok(Validation::Success { is_new_best: true }) - } else if block_number >= known_best_number { - tracing::debug!( + // Check if block is equal or higher than best (this requires a justification) + let relay_chain_best_hash = relay_chain_interface + .best_block_hash() + .await + .map_err(|e| Box::new(e) as Box<_>)?; + let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash); + let block_number = header.number(); + + let best_head = + Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id).await?; + let known_best_number = best_head.number(); + let backed_block = || async { + Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id).await + }; + + if best_head == header { + tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",); + + Ok(Validation::Success { is_new_best: true }) + } else if Some(HeadData(header.encode()).hash()) == backed_block().await? { + tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",); + + Ok(Validation::Success { is_new_best: true }) + } else if block_number >= known_best_number { + tracing::debug!( target: LOG_TARGET, "Validation failed because a justification is needed if the block at the top of the chain." ); - Ok(Validation::Failure { disconnect: false }) - } else { - Ok(Validation::Success { is_new_best: false }) - } + Ok(Validation::Failure { disconnect: false }) + } else { + Ok(Validation::Success { is_new_best: false }) } } } @@ -331,32 +332,40 @@ where fn validate( &mut self, header: &Block::Header, - mut data: &[u8], + data: &[u8], ) -> Pin> + Send>> { - if self.relay_chain_interface.is_major_syncing() { - return ready(Ok(Validation::Success { is_new_best: false })).boxed() - } + let relay_chain_interface = self.relay_chain_interface.clone(); + let mut data = data.to_vec(); + let header = header.clone(); + let header_encoded = header.encode(); + let block_announce_validator = self.clone(); - if data.is_empty() { - return self.handle_empty_block_announce_data(header.clone()).boxed() - } + async move { + let relay_chain_is_syncing = relay_chain_interface + .is_major_syncing() + .await + .map_err(|e| { + tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e) + }) + .unwrap_or(false); - let block_announce_data = match BlockAnnounceData::decode_all(&mut data) { - Ok(r) => r, - Err(err) => - return async move { - Err(Box::new(BlockAnnounceError(format!( + if relay_chain_is_syncing { + return Ok(Validation::Success { is_new_best: false }) + } + + if data.is_empty() { + return block_announce_validator.handle_empty_block_announce_data(header).await + } + + let block_announce_data = match BlockAnnounceData::decode_all(&mut data) { + Ok(r) => r, + Err(err) => + return Err(Box::new(BlockAnnounceError(format!( "Can not decode the `BlockAnnounceData`: {:?}", err - ))) as Box<_>) - } - .boxed(), - }; + ))) as Box<_>), + }; - let relay_chain_interface = self.relay_chain_interface.clone(); - let header_encoded = header.encode(); - - async move { if let Err(e) = block_announce_data.validate(header_encoded) { return Ok(e) } @@ -370,6 +379,7 @@ where block_announce_data .check_signature(&relay_chain_interface) + .await .map_err(|e| Box::new(e) as Box<_>) } .boxed() diff --git a/client/network/src/tests.rs b/client/network/src/tests.rs index 34584edd69d..bd52fc0b93b 100644 --- a/client/network/src/tests.rs +++ b/client/network/src/tests.rs @@ -16,15 +16,15 @@ use super::*; use async_trait::async_trait; -use cumulus_relay_chain_interface::WaitError; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; use cumulus_relay_chain_local::{check_block_in_chain, BlockCheckStatus}; use cumulus_test_service::runtime::{Block, Hash, Header}; -use futures::{executor::block_on, poll, task::Poll, FutureExt, StreamExt}; +use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_primitives::v1::{ - Block as PBlock, CandidateCommitments, CandidateDescriptor, CollatorPair, - CommittedCandidateReceipt, Hash as PHash, HeadData, Id as ParaId, InboundDownwardMessage, + CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt, + Hash as PHash, HeadData, Header as PHeader, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, SigningContext, ValidationCodeHash, ValidatorId, }; @@ -77,53 +77,60 @@ impl DummyRelayChainInterface { #[async_trait] impl RelayChainInterface for DummyRelayChainInterface { - fn validators( + async fn validators( &self, _: &cumulus_primitives_core::relay_chain::BlockId, - ) -> Result, sp_api::ApiError> { + ) -> RelayChainResult> { Ok(self.data.lock().validators.clone()) } - fn block_status( + async fn block_status( &self, block_id: cumulus_primitives_core::relay_chain::BlockId, - ) -> Result { - self.relay_backend.blockchain().status(block_id) + ) -> RelayChainResult { + self.relay_backend + .blockchain() + .status(block_id) + .map_err(RelayChainError::BlockchainError) } - fn best_block_hash(&self) -> PHash { - self.relay_backend.blockchain().info().best_hash + async fn best_block_hash(&self) -> RelayChainResult { + Ok(self.relay_backend.blockchain().info().best_hash) } - fn retrieve_dmq_contents(&self, _: ParaId, _: PHash) -> Option> { + async fn retrieve_dmq_contents( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult> { unimplemented!("Not needed for test") } - fn retrieve_all_inbound_hrmp_channel_contents( + async fn retrieve_all_inbound_hrmp_channel_contents( &self, _: ParaId, _: PHash, - ) -> Option>> { - Some(BTreeMap::new()) + ) -> RelayChainResult>> { + Ok(BTreeMap::new()) } - fn persisted_validation_data( + async fn persisted_validation_data( &self, _: &cumulus_primitives_core::relay_chain::BlockId, _: ParaId, _: OccupiedCoreAssumption, - ) -> Result, sp_api::ApiError> { + ) -> RelayChainResult> { Ok(Some(PersistedValidationData { parent_head: HeadData(default_header().encode()), ..Default::default() })) } - fn candidate_pending_availability( + async fn candidate_pending_availability( &self, _: &cumulus_primitives_core::relay_chain::BlockId, _: ParaId, - ) -> Result, sp_api::ApiError> { + ) -> RelayChainResult> { if self.data.lock().has_pending_availability { Ok(Some(CommittedCandidateReceipt { descriptor: CandidateDescriptor { @@ -152,60 +159,58 @@ impl RelayChainInterface for DummyRelayChainInterface { } } - fn session_index_for_child( + async fn session_index_for_child( &self, _: &cumulus_primitives_core::relay_chain::BlockId, - ) -> Result { + ) -> RelayChainResult { Ok(0) } - fn import_notification_stream(&self) -> sc_client_api::ImportNotifications { - self.relay_client.import_notification_stream() - } - - fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications { - self.relay_client.finality_notification_stream() + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + Ok(Box::pin( + self.relay_client + .import_notification_stream() + .map(|notification| notification.header), + )) } - fn storage_changes_notification_stream( + async fn finality_notification_stream( &self, - filter_keys: Option<&[sc_client_api::StorageKey]>, - child_filter_keys: Option< - &[(sc_client_api::StorageKey, Option>)], - >, - ) -> sc_client_api::blockchain::Result> { - self.relay_client - .storage_changes_notification_stream(filter_keys, child_filter_keys) + ) -> RelayChainResult + Send>>> { + Ok(Box::pin( + self.relay_client + .finality_notification_stream() + .map(|notification| notification.header), + )) } - fn is_major_syncing(&self) -> bool { - false + async fn is_major_syncing(&self) -> RelayChainResult { + Ok(false) } - fn overseer_handle(&self) -> Option { + fn overseer_handle(&self) -> RelayChainResult> { unimplemented!("Not needed for test") } - fn get_storage_by_key( + async fn get_storage_by_key( &self, _: &polkadot_service::BlockId, _: &[u8], - ) -> Result, sp_blockchain::Error> { + ) -> RelayChainResult> { unimplemented!("Not needed for test") } - fn prove_read( + async fn prove_read( &self, _: &polkadot_service::BlockId, _: &Vec>, - ) -> Result, Box> { + ) -> RelayChainResult { unimplemented!("Not needed for test") } - async fn wait_for_block( - &self, - hash: PHash, - ) -> Result<(), cumulus_relay_chain_interface::WaitError> { + async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> { let mut listener = match check_block_in_chain( self.relay_backend.clone(), self.relay_client.clone(), @@ -219,16 +224,32 @@ impl RelayChainInterface for DummyRelayChainInterface { loop { futures::select! { - _ = timeout => return Err(WaitError::Timeout(hash)), + _ = timeout => return Err(RelayChainError::WaitTimeout(hash)), evt = listener.next() => match evt { Some(evt) if evt.hash == hash => return Ok(()), // Not the event we waited on. Some(_) => continue, - None => return Err(WaitError::ImportListenerClosed(hash)), + None => return Err(RelayChainError::ImportListenerClosed(hash)), } } } } + + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let notifications_stream = + self.relay_client + .import_notification_stream() + .filter_map(|notification| async move { + if notification.is_new_best { + Some(notification.header) + } else { + None + } + }); + Ok(Box::pin(notifications_stream)) + } } fn make_validator_and_api( @@ -274,6 +295,7 @@ async fn make_gossip_message_and_header( .unwrap(); let session_index = relay_chain_interface .session_index_for_child(&BlockId::Hash(relay_parent)) + .await .unwrap(); let signing_context = SigningContext { parent_hash: relay_parent, session_index }; @@ -442,9 +464,9 @@ fn check_statement_is_correctly_signed() { assert_eq!(Validation::Failure { disconnect: true }, res.unwrap()); } -#[test] -fn check_statement_seconded() { - let (mut validator, api) = make_validator_and_api(); +#[tokio::test] +async fn check_statement_seconded() { + let (mut validator, relay_chain_interface) = make_validator_and_api(); let header = default_header(); let relay_parent = H256::from_low_u64_be(1); @@ -455,7 +477,10 @@ fn check_statement_seconded() { Some(&Sr25519Keyring::Alice.to_seed()), ) .unwrap(); - let session_index = api.session_index_for_child(&BlockId::Hash(relay_parent)).unwrap(); + let session_index = relay_chain_interface + .session_index_for_child(&BlockId::Hash(relay_parent)) + .await + .unwrap(); let signing_context = SigningContext { parent_hash: relay_parent, session_index }; let statement = Statement::Valid(Default::default()); diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 4d3f67ea06e..d5d1a19b1d9 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -56,7 +56,7 @@ use polkadot_primitives::v1::{ }; use cumulus_primitives_core::ParachainBlockData; -use cumulus_relay_chain_interface::RelayChainInterface; +use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use codec::Decode; use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; @@ -381,7 +381,14 @@ where let mut imported_blocks = self.parachain_client.import_notification_stream().fuse(); let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse(); let pending_candidates = - pending_candidates(self.relay_chain_interface.clone(), self.para_id).fuse(); + match pending_candidates(self.relay_chain_interface.clone(), self.para_id).await { + Ok(pending_candidate_stream) => pending_candidate_stream.fuse(), + Err(err) => { + tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream."); + return + }, + }; + futures::pin_mut!(pending_candidates); loop { @@ -435,28 +442,41 @@ where } /// Returns a stream over pending candidates for the parachain corresponding to `para_id`. -fn pending_candidates( - relay_chain_client: impl RelayChainInterface, +async fn pending_candidates( + relay_chain_client: impl RelayChainInterface + Clone, para_id: ParaId, -) -> impl Stream { - relay_chain_client.import_notification_stream().filter_map(move |n| { - let res = relay_chain_client - .candidate_pending_availability(&BlockId::hash(n.hash), para_id) - .and_then(|pa| { - relay_chain_client - .session_index_for_child(&BlockId::hash(n.hash)) - .map(|v| pa.map(|pa| (pa, v))) - }) - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Failed fetch pending candidates.", - ) - }) - .ok() - .flatten(); - - async move { res } - }) +) -> RelayChainResult> { + let import_notification_stream = relay_chain_client.import_notification_stream().await?; + + let filtered_stream = import_notification_stream.filter_map(move |n| { + let client_for_closure = relay_chain_client.clone(); + async move { + let block_id = BlockId::hash(n.hash()); + let pending_availability_result = client_for_closure + .candidate_pending_availability(&block_id, para_id) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to fetch pending candidates.", + ) + }); + let session_index_result = + client_for_closure.session_index_for_child(&block_id).await.map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to fetch session index.", + ) + }); + + if let Ok(Some(candidate)) = pending_availability_result { + session_index_result.map(|session_index| (candidate, session_index)).ok() + } else { + None + } + } + }); + Ok(filtered_stream) } diff --git a/client/relay-chain-interface/Cargo.toml b/client/relay-chain-interface/Cargo.toml index a962155ed1e..b76ebcc3137 100644 --- a/client/relay-chain-interface/Cargo.toml +++ b/client/relay-chain-interface/Cargo.toml @@ -15,7 +15,10 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } +futures = "0.3.1" parking_lot = "0.11.1" derive_more = "0.99.2" async-trait = "0.1.52" +thiserror = "1.0.30" diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs index 185e9a6f0a3..13b0551b38c 100644 --- a/client/relay-chain-interface/src/lib.rs +++ b/client/relay-chain-interface/src/lib.rs @@ -14,136 +14,140 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, pin::Pin, sync::Arc}; use cumulus_primitives_core::{ relay_chain::{ v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, - Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage, + BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; use polkadot_overseer::Handle as OverseerHandle; use sc_client_api::{blockchain::BlockStatus, StorageProof}; +use futures::Stream; + +use async_trait::async_trait; use sp_api::ApiError; use sp_state_machine::StorageValue; -use async_trait::async_trait; +pub type RelayChainResult = Result; -#[derive(Debug, derive_more::Display)] -pub enum WaitError { - #[display(fmt = "Timeout while waiting for relay-chain block `{}` to be imported.", _0)] - Timeout(PHash), - #[display( - fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.", - _0 - )] +#[derive(thiserror::Error, Debug)] +pub enum RelayChainError { + #[error("Error occured while calling relay chain runtime: {0:?}")] + ApiError(#[from] ApiError), + #[error("Timeout while waiting for relay-chain block `{0}` to be imported.")] + WaitTimeout(PHash), + #[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")] ImportListenerClosed(PHash), - #[display( - fmt = "Blockchain returned an error while waiting for relay-chain block `{}` to be imported: {:?}", - _0, - _1 - )] - BlockchainError(PHash, sp_blockchain::Error), + #[error("Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1:?}")] + WaitBlockchainError(PHash, sp_blockchain::Error), + #[error("Blockchain returned an error: {0:?}")] + BlockchainError(#[from] sp_blockchain::Error), + #[error("State machine error occured: {0:?}")] + StateMachineError(Box), + #[error("Unspecified error occured: {0:?}")] + GenericError(String), } /// Trait that provides all necessary methods for interaction between collator and relay chain. #[async_trait] pub trait RelayChainInterface: Send + Sync { /// Fetch a storage item by key. - fn get_storage_by_key( + async fn get_storage_by_key( &self, block_id: &BlockId, key: &[u8], - ) -> Result, sp_blockchain::Error>; + ) -> RelayChainResult>; /// Fetch a vector of current validators. - fn validators(&self, block_id: &BlockId) -> Result, ApiError>; + async fn validators(&self, block_id: &BlockId) -> RelayChainResult>; /// Get the status of a given block. - fn block_status(&self, block_id: BlockId) -> Result; + async fn block_status(&self, block_id: BlockId) -> RelayChainResult; /// Get the hash of the current best block. - fn best_block_hash(&self) -> PHash; + async fn best_block_hash(&self) -> RelayChainResult; /// Returns the whole contents of the downward message queue for the parachain we are collating /// for. /// /// Returns `None` in case of an error. - fn retrieve_dmq_contents( + async fn retrieve_dmq_contents( &self, para_id: ParaId, relay_parent: PHash, - ) -> Option>; + ) -> RelayChainResult>; /// Returns channels contents for each inbound HRMP channel addressed to the parachain we are /// collating for. /// /// Empty channels are also included. - fn retrieve_all_inbound_hrmp_channel_contents( + async fn retrieve_all_inbound_hrmp_channel_contents( &self, para_id: ParaId, relay_parent: PHash, - ) -> Option>>; + ) -> RelayChainResult>>; /// Yields the persisted validation data for the given `ParaId` along with an assumption that /// should be used if the para currently occupies a core. /// /// Returns `None` if either the para is not registered or the assumption is `Freed` /// and the para already occupies a core. - fn persisted_validation_data( + async fn persisted_validation_data( &self, block_id: &BlockId, para_id: ParaId, _: OccupiedCoreAssumption, - ) -> Result, ApiError>; + ) -> RelayChainResult>; /// Get the receipt of a candidate pending availability. This returns `Some` for any paras /// assigned to occupied cores in `availability_cores` and `None` otherwise. - fn candidate_pending_availability( + async fn candidate_pending_availability( &self, block_id: &BlockId, para_id: ParaId, - ) -> Result, ApiError>; + ) -> RelayChainResult>; /// Returns the session index expected at a child of the block. - fn session_index_for_child(&self, block_id: &BlockId) -> Result; + async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult; /// Get a stream of import block notifications. - fn import_notification_stream(&self) -> sc_client_api::ImportNotifications; + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>>; + + /// Get a stream of new best block notifications. + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>>; /// Wait for a block with a given hash in the relay chain. /// /// This method returns immediately on error or if the block is already /// reported to be in chain. Otherwise, it waits for the block to arrive. - async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError>; + async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()>; /// Get a stream of finality notifications. - fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications; - - /// Get a stream of storage change notifications. - fn storage_changes_notification_stream( + async fn finality_notification_stream( &self, - filter_keys: Option<&[sc_client_api::StorageKey]>, - child_filter_keys: Option< - &[(sc_client_api::StorageKey, Option>)], - >, - ) -> sc_client_api::blockchain::Result>; + ) -> RelayChainResult + Send>>>; /// Whether the synchronization service is undergoing major sync. /// Returns true if so. - fn is_major_syncing(&self) -> bool; + async fn is_major_syncing(&self) -> RelayChainResult; /// Get a handle to the overseer. - fn overseer_handle(&self) -> Option; + fn overseer_handle(&self) -> RelayChainResult>; /// Generate a storage read proof. - fn prove_read( + async fn prove_read( &self, block_id: &BlockId, relevant_keys: &Vec>, - ) -> Result, Box>; + ) -> RelayChainResult; } #[async_trait] @@ -151,98 +155,100 @@ impl RelayChainInterface for Arc where T: RelayChainInterface + ?Sized, { - fn retrieve_dmq_contents( + async fn retrieve_dmq_contents( &self, para_id: ParaId, relay_parent: PHash, - ) -> Option> { - (**self).retrieve_dmq_contents(para_id, relay_parent) + ) -> RelayChainResult> { + (**self).retrieve_dmq_contents(para_id, relay_parent).await } - fn retrieve_all_inbound_hrmp_channel_contents( + async fn retrieve_all_inbound_hrmp_channel_contents( &self, para_id: ParaId, relay_parent: PHash, - ) -> Option>> { - (**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent) + ) -> RelayChainResult>> { + (**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await } - fn persisted_validation_data( + async fn persisted_validation_data( &self, block_id: &BlockId, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, - ) -> Result, ApiError> { - (**self).persisted_validation_data(block_id, para_id, occupied_core_assumption) + ) -> RelayChainResult> { + (**self) + .persisted_validation_data(block_id, para_id, occupied_core_assumption) + .await } - fn candidate_pending_availability( + async fn candidate_pending_availability( &self, block_id: &BlockId, para_id: ParaId, - ) -> Result, ApiError> { - (**self).candidate_pending_availability(block_id, para_id) + ) -> RelayChainResult> { + (**self).candidate_pending_availability(block_id, para_id).await } - fn session_index_for_child(&self, block_id: &BlockId) -> Result { - (**self).session_index_for_child(block_id) + async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult { + (**self).session_index_for_child(block_id).await } - fn validators(&self, block_id: &BlockId) -> Result, ApiError> { - (**self).validators(block_id) + async fn validators(&self, block_id: &BlockId) -> RelayChainResult> { + (**self).validators(block_id).await } - fn import_notification_stream(&self) -> sc_client_api::ImportNotifications { - (**self).import_notification_stream() - } - - fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications { - (**self).finality_notification_stream() + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + (**self).import_notification_stream().await } - fn storage_changes_notification_stream( + async fn finality_notification_stream( &self, - filter_keys: Option<&[sc_client_api::StorageKey]>, - child_filter_keys: Option< - &[(sc_client_api::StorageKey, Option>)], - >, - ) -> sc_client_api::blockchain::Result> { - (**self).storage_changes_notification_stream(filter_keys, child_filter_keys) + ) -> RelayChainResult + Send>>> { + (**self).finality_notification_stream().await } - fn best_block_hash(&self) -> PHash { - (**self).best_block_hash() + async fn best_block_hash(&self) -> RelayChainResult { + (**self).best_block_hash().await } - fn block_status(&self, block_id: BlockId) -> Result { - (**self).block_status(block_id) + async fn block_status(&self, block_id: BlockId) -> RelayChainResult { + (**self).block_status(block_id).await } - fn is_major_syncing(&self) -> bool { - (**self).is_major_syncing() + async fn is_major_syncing(&self) -> RelayChainResult { + (**self).is_major_syncing().await } - fn overseer_handle(&self) -> Option { + fn overseer_handle(&self) -> RelayChainResult> { (**self).overseer_handle() } - fn get_storage_by_key( + async fn get_storage_by_key( &self, block_id: &BlockId, key: &[u8], - ) -> Result, sp_blockchain::Error> { - (**self).get_storage_by_key(block_id, key) + ) -> RelayChainResult> { + (**self).get_storage_by_key(block_id, key).await } - fn prove_read( + async fn prove_read( &self, block_id: &BlockId, relevant_keys: &Vec>, - ) -> Result, Box> { - (**self).prove_read(block_id, relevant_keys) + ) -> RelayChainResult { + (**self).prove_read(block_id, relevant_keys).await } - async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> { + async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> { (**self).wait_for_block(hash).await } + + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + (**self).new_best_notification_stream().await + } } diff --git a/client/relay-chain-local/src/lib.rs b/client/relay-chain-local/src/lib.rs index 5177d1f4afb..903a8ff3c66 100644 --- a/client/relay-chain-local/src/lib.rs +++ b/client/relay-chain-local/src/lib.rs @@ -14,19 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use std::{sync::Arc, time::Duration}; +use std::{pin::Pin, sync::Arc, time::Duration}; use async_trait::async_trait; use cumulus_primitives_core::{ relay_chain::{ v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, v2::ParachainHost, - Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage, + Block as PBlock, BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; -use cumulus_relay_chain_interface::{RelayChainInterface, WaitError}; -use futures::{FutureExt, StreamExt}; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; +use futures::{FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use polkadot_client::{ClientHandle, ExecuteWithClient, FullBackend}; use polkadot_service::{ @@ -37,12 +37,11 @@ use sc_client_api::{ StorageProof, UsageProvider, }; use sc_telemetry::TelemetryWorkerHandle; -use sp_api::{ApiError, ProvideRuntimeApi}; +use sp_api::ProvideRuntimeApi; use sp_consensus::SyncOracle; use sp_core::{sp_std::collections::btree_map::BTreeMap, Pair}; use sp_state_machine::{Backend as StateBackend, StorageValue}; -const LOG_TARGET: &str = "relay-chain-local"; /// The timeout in seconds after that the waiting for a block should be aborted. const TIMEOUT_IN_SECONDS: u64 = 6; @@ -88,158 +87,117 @@ where + Send, Client::Api: ParachainHost + BabeApi, { - fn retrieve_dmq_contents( + async fn retrieve_dmq_contents( &self, para_id: ParaId, relay_parent: PHash, - ) -> Option> { - self.full_client - .runtime_api() - .dmq_contents_with_context( - &BlockId::hash(relay_parent), - sp_core::ExecutionContext::Importing, - para_id, - ) - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - relay_parent = ?relay_parent, - error = ?e, - "An error occured during requesting the downward messages.", - ); - }) - .ok() + ) -> RelayChainResult> { + Ok(self.full_client.runtime_api().dmq_contents_with_context( + &BlockId::hash(relay_parent), + sp_core::ExecutionContext::Importing, + para_id, + )?) } - fn retrieve_all_inbound_hrmp_channel_contents( + async fn retrieve_all_inbound_hrmp_channel_contents( &self, para_id: ParaId, relay_parent: PHash, - ) -> Option>> { - self.full_client - .runtime_api() - .inbound_hrmp_channels_contents_with_context( - &BlockId::hash(relay_parent), - sp_core::ExecutionContext::Importing, - para_id, - ) - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - relay_parent = ?relay_parent, - error = ?e, - "An error occured during requesting the inbound HRMP messages.", - ); - }) - .ok() + ) -> RelayChainResult>> { + Ok(self.full_client.runtime_api().inbound_hrmp_channels_contents_with_context( + &BlockId::hash(relay_parent), + sp_core::ExecutionContext::Importing, + para_id, + )?) } - fn persisted_validation_data( + async fn persisted_validation_data( &self, block_id: &BlockId, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, - ) -> Result, ApiError> { - self.full_client.runtime_api().persisted_validation_data( + ) -> RelayChainResult> { + Ok(self.full_client.runtime_api().persisted_validation_data( block_id, para_id, occupied_core_assumption, - ) + )?) } - fn candidate_pending_availability( + async fn candidate_pending_availability( &self, block_id: &BlockId, para_id: ParaId, - ) -> Result, ApiError> { - self.full_client.runtime_api().candidate_pending_availability(block_id, para_id) - } - - fn session_index_for_child(&self, block_id: &BlockId) -> Result { - self.full_client.runtime_api().session_index_for_child(block_id) + ) -> RelayChainResult> { + Ok(self + .full_client + .runtime_api() + .candidate_pending_availability(block_id, para_id)?) } - fn validators(&self, block_id: &BlockId) -> Result, ApiError> { - self.full_client.runtime_api().validators(block_id) + async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult { + Ok(self.full_client.runtime_api().session_index_for_child(block_id)?) } - fn import_notification_stream(&self) -> sc_client_api::ImportNotifications { - self.full_client.import_notification_stream() + async fn validators(&self, block_id: &BlockId) -> RelayChainResult> { + Ok(self.full_client.runtime_api().validators(block_id)?) } - fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications { - self.full_client.finality_notification_stream() + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let notification_stream = self + .full_client + .import_notification_stream() + .map(|notification| notification.header); + Ok(Box::pin(notification_stream)) } - fn storage_changes_notification_stream( + async fn finality_notification_stream( &self, - filter_keys: Option<&[sc_client_api::StorageKey]>, - child_filter_keys: Option< - &[(sc_client_api::StorageKey, Option>)], - >, - ) -> sc_client_api::blockchain::Result> { - self.full_client - .storage_changes_notification_stream(filter_keys, child_filter_keys) + ) -> RelayChainResult + Send>>> { + let notification_stream = self + .full_client + .finality_notification_stream() + .map(|notification| notification.header); + Ok(Box::pin(notification_stream)) } - fn best_block_hash(&self) -> PHash { - self.backend.blockchain().info().best_hash + async fn best_block_hash(&self) -> RelayChainResult { + Ok(self.backend.blockchain().info().best_hash) } - fn block_status(&self, block_id: BlockId) -> Result { - self.backend.blockchain().status(block_id) + async fn block_status(&self, block_id: BlockId) -> RelayChainResult { + Ok(self.backend.blockchain().status(block_id)?) } - fn is_major_syncing(&self) -> bool { + async fn is_major_syncing(&self) -> RelayChainResult { let mut network = self.sync_oracle.lock(); - network.is_major_syncing() + Ok(network.is_major_syncing()) } - fn overseer_handle(&self) -> Option { - self.overseer_handle.clone() + fn overseer_handle(&self) -> RelayChainResult> { + Ok(self.overseer_handle.clone()) } - fn get_storage_by_key( + async fn get_storage_by_key( &self, block_id: &BlockId, key: &[u8], - ) -> Result, sp_blockchain::Error> { + ) -> RelayChainResult> { let state = self.backend.state_at(*block_id)?; - state.storage(key).map_err(sp_blockchain::Error::Storage) + state.storage(key).map_err(RelayChainError::GenericError) } - fn prove_read( + async fn prove_read( &self, block_id: &BlockId, relevant_keys: &Vec>, - ) -> Result, Box> { - let state_backend = self - .backend - .state_at(*block_id) - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - relay_parent = ?block_id, - error = ?e, - "Cannot obtain the state of the relay chain.", - ); - }) - .ok(); - - match state_backend { - Some(state) => sp_state_machine::prove_read(state, relevant_keys) - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - relay_parent = ?block_id, - error = ?e, - "Failed to collect required relay chain state storage proof.", - ); - e - }) - .map(Some), - None => Ok(None), - } + ) -> RelayChainResult { + let state_backend = self.backend.state_at(*block_id)?; + + sp_state_machine::prove_read(state_backend, relevant_keys) + .map_err(RelayChainError::StateMachineError) } /// Wait for a given relay chain block in an async way. @@ -259,7 +217,7 @@ where /// /// The timeout is set to 6 seconds. This should be enough time to import the block in the current /// round and if not, the new round of the relay chain already started anyway. - async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> { + async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> { let mut listener = match check_block_in_chain(self.backend.clone(), self.full_client.clone(), hash)? { BlockCheckStatus::InChain => return Ok(()), @@ -270,16 +228,28 @@ where loop { futures::select! { - _ = timeout => return Err(WaitError::Timeout(hash)), + _ = timeout => return Err(RelayChainError::WaitTimeout(hash)), evt = listener.next() => match evt { Some(evt) if evt.hash == hash => return Ok(()), // Not the event we waited on. Some(_) => continue, - None => return Err(WaitError::ImportListenerClosed(hash)), + None => return Err(RelayChainError::ImportListenerClosed(hash)), } } } } + + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let notifications_stream = + self.full_client + .import_notification_stream() + .filter_map(|notification| async move { + notification.is_new_best.then(|| notification.header) + }); + Ok(Box::pin(notifications_stream)) + } } pub enum BlockCheckStatus { @@ -294,16 +264,15 @@ pub fn check_block_in_chain( backend: Arc, client: Arc, hash: PHash, -) -> Result +) -> RelayChainResult where Client: BlockchainEvents, { let _lock = backend.get_import_lock().read(); let block_id = BlockId::Hash(hash); - match backend.blockchain().status(block_id) { - Ok(BlockStatus::InChain) => return Ok(BlockCheckStatus::InChain), - Err(err) => return Err(WaitError::BlockchainError(hash, err)), + match backend.blockchain().status(block_id)? { + BlockStatus::InChain => return Ok(BlockCheckStatus::InChain), _ => {}, } @@ -495,7 +464,7 @@ mod tests { assert!(matches!( block_on(relay_chain_interface.wait_for_block(hash)), - Err(WaitError::Timeout(_)) + Err(RelayChainError::WaitTimeout(_)) )); } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 5b050e75aa5..08cd8584f22 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -107,10 +107,13 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))? + .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?; + let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new( - relay_chain_interface - .overseer_handle() - .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?, + overseer_handle.clone(), // We want that collators wait at maximum the relay chain slot duration before starting // to recover blocks. cumulus_client_pov_recovery::RecoveryDelay::WithMax { max: relay_chain_slot_duration }, @@ -128,9 +131,7 @@ where runtime_api: client.clone(), block_status, announce_block, - overseer_handle: relay_chain_interface - .overseer_handle() - .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?, + overseer_handle, spawner, para_id, key: collator_key, @@ -192,10 +193,13 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))? + .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?; + let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new( - relay_chain_interface - .overseer_handle() - .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?, + overseer_handle, // Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and // in maximum 5 minutes before starting to recover blocks. Collators should already start // the recovery way before full nodes try to recover a certain block and then share the diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index b30d0777076..7c261e8583c 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -605,7 +605,7 @@ pub mod pallet { #[pallet::genesis_build] impl GenesisBuild for GenesisConfig { fn build(&self) { - //TODO: Remove after https://github.com/paritytech/cumulus/issues/479 + // TODO: Remove after https://github.com/paritytech/cumulus/issues/479 sp_io::storage::set(b":c", &[]); } } diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index d0c0826e32c..400daeaa67a 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -436,14 +436,15 @@ pub async fn start_parachain_node( BuildAuraConsensusParams { proposer_factory, create_inherent_data_providers: move |_, (relay_parent, validation_data)| { - let parachain_inherent = + let relay_chain_interface = relay_chain_interface.clone(); + async move { + let parachain_inherent = cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( relay_parent, &relay_chain_interface, &validation_data, id, - ); - async move { + ).await; let time = sp_timestamp::InherentDataProvider::from_system_time(); let slot = diff --git a/polkadot-parachains/src/service.rs b/polkadot-parachains/src/service.rs index 9f81b9af0d7..a67b20c11e5 100644 --- a/polkadot-parachains/src/service.rs +++ b/polkadot-parachains/src/service.rs @@ -738,14 +738,15 @@ pub async fn start_rococo_parachain_node( >(BuildAuraConsensusParams { proposer_factory, create_inherent_data_providers: move |_, (relay_parent, validation_data)| { + let relay_chain_interface = relay_chain_interface.clone(); + async move { let parachain_inherent = cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( relay_parent, &relay_chain_interface, &validation_data, id, - ); - async move { + ).await; let time = sp_timestamp::InherentDataProvider::from_system_time(); let slot = @@ -875,14 +876,15 @@ where block_import: client.clone(), relay_chain_interface: relay_chain_interface.clone(), create_inherent_data_providers: move |_, (relay_parent, validation_data)| { - let parachain_inherent = + let relay_chain_interface = relay_chain_interface.clone(); + async move { + let parachain_inherent = cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( relay_parent, &relay_chain_interface, &validation_data, id, - ); - async move { + ).await; let parachain_inherent = parachain_inherent.ok_or_else(|| { Box::::from( "Failed to create parachain inherent", @@ -1157,14 +1159,15 @@ where proposer_factory, create_inherent_data_providers: move |_, (relay_parent, validation_data)| { - let parachain_inherent = + let relay_chain_for_aura = relay_chain_for_aura.clone(); + async move { + let parachain_inherent = cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( relay_parent, &relay_chain_for_aura, &validation_data, id, - ); - async move { + ).await; let time = sp_timestamp::InherentDataProvider::from_system_time(); @@ -1216,14 +1219,15 @@ where relay_chain_interface: relay_chain_interface.clone(), create_inherent_data_providers: move |_, (relay_parent, validation_data)| { - let parachain_inherent = + let relay_chain_interface = relay_chain_interface.clone(); + async move { + let parachain_inherent = cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( relay_parent, &relay_chain_interface, &validation_data, id, - ); - async move { + ).await; let parachain_inherent = parachain_inherent.ok_or_else(|| { Box::::from( diff --git a/primitives/parachain-inherent/src/client_side.rs b/primitives/parachain-inherent/src/client_side.rs index dab368dc6cd..b14c2257654 100644 --- a/primitives/parachain-inherent/src/client_side.rs +++ b/primitives/parachain-inherent/src/client_side.rs @@ -29,7 +29,7 @@ const LOG_TARGET: &str = "parachain-inherent"; /// Collect the relevant relay chain state in form of a proof for putting it into the validation /// data inherent. -fn collect_relay_storage_proof( +async fn collect_relay_storage_proof( relay_chain_interface: &impl RelayChainInterface, para_id: ParaId, relay_parent: PHash, @@ -42,6 +42,7 @@ fn collect_relay_storage_proof( &relay_parent_block_id, &relay_well_known_keys::hrmp_ingress_channel_index(para_id), ) + .await .map_err(|e| { tracing::error!( target: LOG_TARGET, @@ -70,6 +71,7 @@ fn collect_relay_storage_proof( &relay_parent_block_id, &relay_well_known_keys::hrmp_egress_channel_index(para_id), ) + .await .map_err(|e| { tracing::error!( target: LOG_TARGET, @@ -108,26 +110,57 @@ fn collect_relay_storage_proof( relay_well_known_keys::hrmp_channels(HrmpChannelId { sender: para_id, recipient }) })); - relay_chain_interface.prove_read(&relay_parent_block_id, &relevant_keys).ok()? + relay_chain_interface + .prove_read(&relay_parent_block_id, &relevant_keys) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + relay_parent = ?relay_parent_block_id, + error = ?e, + "Cannot obtain read proof from relay chain.", + ); + }) + .ok() } impl ParachainInherentData { /// Create the [`ParachainInherentData`] at the given `relay_parent`. /// /// Returns `None` if the creation failed. - pub fn create_at( + pub async fn create_at( relay_parent: PHash, relay_chain_interface: &impl RelayChainInterface, validation_data: &PersistedValidationData, para_id: ParaId, ) -> Option { let relay_chain_state = - collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent)?; - - let downward_messages = - relay_chain_interface.retrieve_dmq_contents(para_id, relay_parent)?; + collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent).await?; + + let downward_messages = relay_chain_interface + .retrieve_dmq_contents(para_id, relay_parent) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + error = ?e, + "An error occured during requesting the downward messages.", + ); + }) + .ok()?; let horizontal_messages = relay_chain_interface - .retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)?; + .retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + error = ?e, + "An error occured during requesting the inbound HRMP messages.", + ); + }) + .ok()?; Some(ParachainInherentData { downward_messages, diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 319076be306..750cb7d881f 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -31,9 +31,9 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_local::RelayChainLocal; use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; +use parking_lot::Mutex; use frame_system_rpc_runtime_api::AccountNonceApi; -use parking_lot::Mutex; use polkadot_primitives::v1::{CollatorPair, Hash as PHash, PersistedValidationData}; use polkadot_service::ProvideRuntimeApi; use sc_client_api::execution_extensions::ExecutionStrategies; @@ -288,15 +288,16 @@ where para_id, proposer_factory, move |_, (relay_parent, validation_data)| { - let parachain_inherent = + let relay_chain_interface = relay_chain_interface_for_closure.clone(); + async move { + let parachain_inherent = cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( relay_parent, - &relay_chain_interface_for_closure, + &relay_chain_interface, &validation_data, para_id, - ); + ).await; - async move { let time = sp_timestamp::InherentDataProvider::from_system_time(); let parachain_inherent = parachain_inherent.ok_or_else(|| {