Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Unify RelayChainInterface error handling and introduce async #909

Merged
merged 33 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3d7d222
Initial network interface preparations
skunert Dec 21, 2021
b23547c
Implement get_storage_by_key
skunert Jan 4, 2022
cfaa211
Implement `validators` and `session_index_for_child`
skunert Jan 6, 2022
af362b4
Implement persisted_validation_data and candidate_pending_availability
skunert Jan 7, 2022
a8d80d5
Fix method name for persisted_validation_data and add encoded params
skunert Jan 7, 2022
482ecd4
Implement `retrieve_dmq_contents` and `retrieve_all_inbound_hrmp_chan…
skunert Jan 10, 2022
c03ba54
Implement `prove_read`
skunert Jan 10, 2022
b4ca285
Introduce separate RPC client, expose JsonRpSee errors
skunert Jan 10, 2022
f0d6c18
Simplify closure in call_remote_runtime_function
skunert Jan 10, 2022
d4dada5
Implement import stream, upgrade JsonRpSee
skunert Jan 12, 2022
dc5de78
Implement finality stream
skunert Jan 12, 2022
2675a7d
Remove unused method from interface
skunert Jan 12, 2022
f9a228d
Implement `is_major_syncing`
skunert Jan 12, 2022
7a89c73
Implement `wait_on_block`
skunert Jan 13, 2022
b92f204
Merge branch 'master' into network-interface-master
skunert Jan 13, 2022
532ba1d
Fix tests
skunert Jan 13, 2022
d49e25b
Unify error handling `ApiError`
skunert Jan 13, 2022
051068f
Replace WaitError with RelayChainError
skunert Jan 13, 2022
4b664a7
Wrap BlockChainError in RelayChainError
skunert Jan 13, 2022
40432e8
Unify error handling in relay chain intefaces
skunert Jan 17, 2022
5e03777
Fix return type of proof method
skunert Jan 17, 2022
d916d69
Improve error handling of new methods
skunert Jan 17, 2022
ee60f31
Improve error handling and move logging outside of interface
skunert Jan 18, 2022
0a1dd44
Clean up
skunert Jan 18, 2022
68fa074
Remove unwanted changes, clean up
skunert Jan 18, 2022
2764ea2
Remove unused import
skunert Jan 18, 2022
3be8d65
Add format for StatemachineError and remove nused From trait
skunert Jan 19, 2022
4c9b8ea
Use 'thiserror' crate to simplify error handling
skunert Jan 24, 2022
f2caf4b
Expose error for overseer, further simplify error handling
skunert Jan 24, 2022
7c95a65
Merge branch 'master' into relay-chain-interface-error-handling
skunert Jan 24, 2022
4e827a7
Apply suggestions from code review
skunert Jan 25, 2022
254ff93
Improve readability in pov-recovery
skunert Jan 25, 2022
53d10eb
Make handle_block_announce_data async
skunert Jan 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
futures = { version = "0.3.8", features = ["compat"] }
codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] }
tracing = "0.1.25"
async-trait = "0.1.42"
async-trait = "0.1.52"
dyn-clone = "1.0.4"

[dev-dependencies]
Expand Down
84 changes: 58 additions & 26 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use cumulus_relay_chain_interface::RelayChainInterface;
use async_trait::async_trait;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_blockchain::Error as ClientError;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
generic::BlockId,
Expand All @@ -29,11 +30,14 @@ use sp_runtime::{
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId, OccupiedCoreAssumption};

use codec::Decode;
use futures::{future, select, FutureExt, Stream, StreamExt};
use futures::{select, FutureExt, Stream, StreamExt};

use std::{pin::Pin, sync::Arc};

const LOG_TARGET: &str = "cumulus-consensus";

/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
/// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send;
Expand All @@ -42,17 +46,17 @@ pub trait RelaychainClient: Clone + 'static {
type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;

/// Get a stream of new best heads for the given parachain.
fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream;
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;

/// Get a stream of finalized heads for the given parachain.
fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream;
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;

/// Returns the parachain head for the given `para_id` at the given block id.
fn parachain_head_at(
async fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>>;
) -> RelayChainResult<Option<Vec<u8>>>;
}

/// Follow the finalized head of the given parachain.
Expand All @@ -66,7 +70,13 @@ where
R: RelaychainClient,
B: Backend<Block>,
{
let mut finalized_heads = relay_chain.finalized_heads(para_id);
let mut finalized_heads = match relay_chain.finalized_heads(para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
Expand Down Expand Up @@ -165,7 +175,14 @@ async fn follow_new_best<P, R, Block, B>(
R: RelaychainClient,
B: Backend<Block>,
{
let mut new_best_heads = relay_chain.new_best_heads(para_id).fuse();
let mut new_best_heads = match relay_chain.new_best_heads(para_id).await {
Ok(best_heads_stream) => best_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
return
},
};

let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to
Expand Down Expand Up @@ -368,6 +385,7 @@ where
}
}

#[async_trait]
impl<RCInterface> RelaychainClient for RCInterface
where
RCInterface: RelayChainInterface + Clone + 'static,
Expand All @@ -376,39 +394,53 @@ where

type HeadStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;

fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream {
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();

self.import_notification_stream()
let new_best_notification_stream = self
.new_best_notification_stream()
.await?
.filter_map(move |n| {
future::ready(if n.is_new_best {
relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten()
} else {
None
})
let relay_chain = relay_chain.clone();
async move {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash()), para_id)
.await
.ok()
.flatten()
}
})
.boxed()
.boxed();
Ok(new_best_notification_stream)
}

fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream {
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();

self.finality_notification_stream()
let finality_notification_stream = self
.finality_notification_stream()
.await?
.filter_map(move |n| {
future::ready(
relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten(),
)
let relay_chain = relay_chain.clone();
async move {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash()), para_id)
.await
.ok()
.flatten()
}
})
.boxed()
.boxed();
Ok(finality_notification_stream)
}

fn parachain_head_at(
async fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
) -> RelayChainResult<Option<Vec<u8>>> {
self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map(|s| s.map(|s| s.parent_head.0))
.map_err(Into::into)
}
}
19 changes: 13 additions & 6 deletions client/consensus/common/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

use crate::*;

use async_trait::async_trait;
use codec::Encode;
use cumulus_relay_chain_interface::RelayChainResult;
use cumulus_test_client::{
runtime::{Block, Header},
Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
Expand All @@ -26,7 +28,7 @@ use futures_timer::Delay;
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId};
use sc_client_api::UsageProvider;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_blockchain::Error as ClientError;
use sp_consensus::BlockOrigin;
use sp_runtime::generic::BlockId;
use std::{
Expand Down Expand Up @@ -66,12 +68,13 @@ impl Relaychain {
}
}

#[async_trait]
impl crate::parachain_consensus::RelaychainClient for Relaychain {
type Error = ClientError;

type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;

fn new_best_heads(&self, _: ParaId) -> Self::HeadStream {
async fn new_best_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
let stream = self
.inner
.lock()
Expand All @@ -80,10 +83,10 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");

Box::new(stream.map(|v| v.encode()))
Ok(Box::new(stream.map(|v| v.encode())))
}

fn finalized_heads(&self, _: ParaId) -> Self::HeadStream {
async fn finalized_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
let stream = self
.inner
.lock()
Expand All @@ -92,10 +95,14 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");

Box::new(stream.map(|v| v.encode()))
Ok(Box::new(stream.map(|v| v.encode())))
}

fn parachain_head_at(&self, _: &BlockId<PBlock>, _: ParaId) -> ClientResult<Option<Vec<u8>>> {
async fn parachain_head_at(
&self,
_: &BlockId<PBlock>,
_: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
unimplemented!("Not required for tests")
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/relay-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
// TODO: Fix this.
Duration::from_millis(500),
// Set the block limit to 50% of the maximum PoV size.
//
Expand Down
Loading