From 01ee1c662109cd75713f4f07fc1a123527aa6927 Mon Sep 17 00:00:00 2001 From: jasl Date: Thu, 20 Apr 2023 18:11:23 +0800 Subject: [PATCH] Backport https://github.com/paritytech/cumulus/pull/2461 --- Cargo.lock | 51 +- Cargo.toml | 5 + cumulus/client/consensus/common/Cargo.toml | 40 + .../consensus/common/src/level_monitor.rs | 374 ++++++++ cumulus/client/consensus/common/src/lib.rs | 174 ++++ .../common/src/parachain_consensus.rs | 507 +++++++++++ cumulus/client/consensus/common/src/tests.rs | 801 ++++++++++++++++++ 7 files changed, 1926 insertions(+), 26 deletions(-) create mode 100644 cumulus/client/consensus/common/Cargo.toml create mode 100644 cumulus/client/consensus/common/src/level_monitor.rs create mode 100644 cumulus/client/consensus/common/src/lib.rs create mode 100644 cumulus/client/consensus/common/src/parachain_consensus.rs create mode 100644 cumulus/client/consensus/common/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 401766b2..eb69b8d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -458,13 +458,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.67" +version = "0.1.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ea188f25f0255d8f92797797c97ebf5631fa88178beb1a46fdf5622c9a00e4" +checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.8", + "syn 2.0.12", ] [[package]] @@ -1052,7 +1052,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.8", + "syn 2.0.12", ] [[package]] @@ -1524,7 +1524,6 @@ dependencies = [ [[package]] name = "cumulus-client-consensus-common" version = "0.1.0" -source = "git+https://github.com/paritytech/cumulus?branch=polkadot-v0.9.41#ae4e75b077c220bdf29b299b36a63b87ccb46b4c" dependencies = [ "async-trait", "cumulus-client-pov-recovery", @@ -3206,9 +3205,9 @@ checksum = "aa590387383a574eb0a02370ad4b29c72e6ddd6b0afc2f6e2890bdb4be6d3a92" [[package]] name = "futures" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "531ac96c6ff5fd7c62263c5e3c67a603af4fcaee2e1a0ae5565ba3a11e69e549" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ "futures-channel", "futures-core", @@ -3221,9 +3220,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", "futures-sink", @@ -3231,15 +3230,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1997dd9df74cdac935c76252744c1ed5794fac083242ea4fe77ef3ed60ba0f83" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" dependencies = [ "futures-core", "futures-task", @@ -3249,9 +3248,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" [[package]] name = "futures-lite" @@ -3270,13 +3269,13 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.12", ] [[package]] @@ -3292,15 +3291,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" [[package]] name = "futures-task" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" [[package]] name = "futures-timer" @@ -3310,9 +3309,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ "futures-channel", "futures-core", @@ -12666,9 +12665,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.8" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc02725fd69ab9f26eab07fad303e2497fad6fb9eba4f96c4d1687bdf704ad9" +checksum = "79d9531f94112cfc3e4c8f5f02cb2b58f72c97b7efd85f70203cc6d8efda5927" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index edba0650..2ecbde8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ exclude = [ "vendor/webpki", "vendor/ring", "polkadot/node/service", + "cumulus/client/consensus/common", ] members = [ @@ -54,3 +55,7 @@ members = [ [patch."https://github.com/paritytech/polkadot"] polkadot-service = { path = "polkadot/node/service" } + +# TODO: Remove after upgrade to Polkdaot v0.9.42 +[patch."https://github.com/paritytech/cumulus"] +cumulus-client-consensus-common = { path = "cumulus/client/consensus/common" } diff --git a/cumulus/client/consensus/common/Cargo.toml b/cumulus/client/consensus/common/Cargo.toml new file mode 100644 index 00000000..a90a252c --- /dev/null +++ b/cumulus/client/consensus/common/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "cumulus-client-consensus-common" +description = "Cumulus specific common consensus implementations" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2021" + +[dependencies] +async-trait = "0.1.68" +codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] } +dyn-clone = "1.0.11" +futures = "0.3.28" +log = "0.4.17" +tracing = "0.1.37" + +# Substrate +sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" } +sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" } +sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" } +sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" } +sp-trie = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" } + +# Polkadot +polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.41" } + +# Cumulus +cumulus-primitives-core = { git = "https://github.com/paritytech/cumulus", branch = "polkadot-v0.9.41" } +cumulus-relay-chain-interface = { git = "https://github.com/paritytech/cumulus", branch = "polkadot-v0.9.41" } +cumulus-client-pov-recovery = { git = "https://github.com/paritytech/cumulus", branch = "polkadot-v0.9.41" } +schnellru = "0.2.1" + +[dev-dependencies] +futures-timer = "3.0.2" + +# Substrate +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.41" } + +# Cumulus +cumulus-test-client = { git = "https://github.com/paritytech/cumulus", branch = "polkadot-v0.9.41" } diff --git a/cumulus/client/consensus/common/src/level_monitor.rs b/cumulus/client/consensus/common/src/level_monitor.rs new file mode 100644 index 00000000..4f344b50 --- /dev/null +++ b/cumulus/client/consensus/common/src/level_monitor.rs @@ -0,0 +1,374 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _}; +use sp_blockchain::{HashAndNumber, HeaderMetadata, TreeRoute}; +use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +/// Value good enough to be used with parachains using the current backend implementation +/// that ships with Substrate. This value may change in the future. +pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32; + +// Counter threshold after which we are going to eventually cleanup our internal data. +const CLEANUP_THRESHOLD: u32 = 32; + +/// Upper bound to the number of leaves allowed for each level of the blockchain. +/// +/// If the limit is set and more leaves are detected on block import, then the older ones are +/// dropped to make space for the fresh blocks. +/// +/// In environments where blocks confirmations from the relay chain may be "slow", then +/// setting an upper bound helps keeping the chain health by dropping old (presumably) stale +/// leaves and prevents discarding new blocks because we've reached the backend max value. +pub enum LevelLimit { + /// Limit set to [`MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT`]. + Default, + /// No explicit limit, however a limit may be implicitly imposed by the backend implementation. + None, + /// Custom value. + Some(usize), +} + +/// Support structure to constrain the number of leaves at each level. +pub struct LevelMonitor { + /// Max number of leaves for each level. + level_limit: usize, + /// Monotonic counter used to keep track of block freshness. + pub(crate) import_counter: NumberFor, + /// Map between blocks hashes and freshness. + pub(crate) freshness: HashMap>, + /// Blockchain levels cache. + pub(crate) levels: HashMap, HashSet>, + /// Lower level number stored by the levels map. + lowest_level: NumberFor, + /// Backend reference to remove blocks on level saturation. + backend: Arc, +} + +/// Contains information about the target scheduled for removal. +struct TargetInfo { + /// Index of freshest leaf in the leaves array. + freshest_leaf_idx: usize, + /// Route from target to its freshest leaf. + freshest_route: TreeRoute, +} + +impl LevelMonitor +where + Block: BlockT, + BE: Backend, +{ + /// Instance a new monitor structure. + pub fn new(level_limit: usize, backend: Arc) -> Self { + let mut monitor = LevelMonitor { + level_limit, + import_counter: Zero::zero(), + freshness: HashMap::new(), + levels: HashMap::new(), + lowest_level: Zero::zero(), + backend, + }; + monitor.restore(); + monitor + } + + /// Restore the structure using the backend. + /// + /// Blocks freshness values are inferred from the height and not from the effective import + /// moment. This is a not accurate but "good-enough" best effort solution. + /// + /// Level limits are not enforced during this phase. + fn restore(&mut self) { + const ERR_MSG: &str = "route from finalized to leaf should be available; qed"; + let info = self.backend.blockchain().info(); + + log::debug!( + target: "parachain", + "Restoring chain level monitor from last finalized block: {} {}", + info.finalized_number, info.finalized_hash + ); + + self.lowest_level = info.finalized_number; + self.import_counter = info.finalized_number; + + for leaf in self.backend.blockchain().leaves().unwrap_or_default() { + let mut meta = self.backend.blockchain().header_metadata(leaf).expect(ERR_MSG); + + self.import_counter = self.import_counter.max(meta.number); + + // Populate the monitor until we don't hit an already imported branch + while !self.freshness.contains_key(&meta.hash) { + self.freshness.insert(meta.hash, meta.number); + self.levels.entry(meta.number).or_default().insert(meta.hash); + if meta.number <= self.lowest_level { + break + } + meta = self.backend.blockchain().header_metadata(meta.parent).expect(ERR_MSG); + } + } + + log::debug!(target: "parachain", "Restored chain level monitor up to height {}", self.import_counter); + } + + /// Check and enforce the limit bound at the given height. + /// + /// In practice this will enforce the given height in having a number of blocks less than + /// the limit passed to the constructor. + /// + /// If the given level is found to have a number of blocks greater than or equal the limit + /// then the limit is enforced by chosing one (or more) blocks to remove. + /// + /// The removal strategy is driven by the block freshness. + /// + /// A block freshness is determined by the most recent leaf freshness descending from the block + /// itself. In other words its freshness is equal to its more "fresh" descendant. + /// + /// The least "fresh" blocks are eventually removed. + pub fn enforce_limit(&mut self, number: NumberFor) { + let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default(); + if level_len < self.level_limit { + return + } + + // Sort leaves by freshness only once (less fresh first) and keep track of + // leaves that were invalidated on removal. + let mut leaves = self.backend.blockchain().leaves().unwrap_or_default(); + leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b))); + let mut invalidated_leaves = HashSet::new(); + + // This may not be the most efficient way to remove **multiple** entries, but is the easy + // one :-). Should be considered that in "normal" conditions the number of blocks to remove + // is 0 or 1, it is not worth to complicate the code too much. One condition that may + // trigger multiple removals (2+) is if we restart the node using an existing db and a + // smaller limit wrt the one previously used. + let remove_count = level_len - self.level_limit + 1; + + log::debug!( + target: "parachain", + "Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks", + ); + + (0..remove_count).all(|_| { + self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| { + self.remove_target(target, number, &leaves, &mut invalidated_leaves); + true + }) + }); + } + + // Helper function to find the best candidate to be removed. + // + // Given a set of blocks with height equal to `number` (potential candidates) + // 1. For each candidate fetch all the leaves that are descending from it. + // 2. Set the candidate freshness equal to the fresher of its descending leaves. + // 3. The target is set as the candidate that is less fresh. + // + // Input `leaves` are assumed to be already ordered by "freshness" (less fresh first). + // + // Returns the index of the target fresher leaf within `leaves` and the route from target to + // such leaf. + fn find_target( + &self, + number: NumberFor, + leaves: &[Block::Hash], + invalidated_leaves: &HashSet, + ) -> Option> { + let mut target_info: Option> = None; + let blockchain = self.backend.blockchain(); + let best_hash = blockchain.info().best_hash; + + // Leaves that where already assigned to some node and thus can be skipped + // during the search. + let mut assigned_leaves = HashSet::new(); + + let level = self.levels.get(&number)?; + + for blk_hash in level.iter().filter(|hash| **hash != best_hash) { + // Search for the fresher leaf information for this block + let candidate_info = leaves + .iter() + .enumerate() + .filter(|(leaf_idx, _)| { + !assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx) + }) + .rev() + .find_map(|(leaf_idx, leaf_hash)| { + if blk_hash == leaf_hash { + let entry = HashAndNumber { number, hash: *blk_hash }; + TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo { + freshest_leaf_idx: leaf_idx, + freshest_route, + }) + } else { + match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) { + Ok(route) if route.retracted().is_empty() => Some(TargetInfo { + freshest_leaf_idx: leaf_idx, + freshest_route: route, + }), + Err(err) => { + log::warn!( + target: "parachain", + "(Lookup) Unable getting route from {:?} to {:?}: {}", + blk_hash, leaf_hash, err, + ); + None + }, + _ => None, + } + } + }); + + let candidate_info = match candidate_info { + Some(candidate_info) => { + assigned_leaves.insert(candidate_info.freshest_leaf_idx); + candidate_info + }, + None => { + // This should never happen + log::error!( + target: "parachain", + "Unable getting route to any leaf from {:?} (this is a bug)", + blk_hash, + ); + continue + }, + }; + + // Found fresher leaf for this candidate. + // This candidate is set as the new target if: + // 1. its fresher leaf is less fresh than the previous target fresher leaf AND + // 2. best block is not in its route + + let is_less_fresh = || { + target_info + .as_ref() + .map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx) + .unwrap_or(true) + }; + let not_contains_best = || { + candidate_info + .freshest_route + .enacted() + .iter() + .all(|entry| entry.hash != best_hash) + }; + + if is_less_fresh() && not_contains_best() { + let early_stop = candidate_info.freshest_leaf_idx == 0; + target_info = Some(candidate_info); + if early_stop { + // We will never find a candidate with an worst freshest leaf than this. + break + } + } + } + + target_info + } + + // Remove the target block and all its descendants. + // + // Leaves should have already been ordered by "freshness" (less fresh first). + fn remove_target( + &mut self, + target: TargetInfo, + number: NumberFor, + leaves: &[Block::Hash], + invalidated_leaves: &mut HashSet, + ) { + let mut remove_leaf = |number, hash| { + log::debug!(target: "parachain", "Removing block (@{}) {:?}", number, hash); + if let Err(err) = self.backend.remove_leaf_block(hash) { + log::debug!(target: "parachain", "Remove not possible for {}: {}", hash, err); + return false + } + self.levels.get_mut(&number).map(|level| level.remove(&hash)); + self.freshness.remove(&hash); + true + }; + + invalidated_leaves.insert(target.freshest_leaf_idx); + + // Takes care of route removal. Starts from the leaf and stops as soon as an error is + // encountered. In this case an error is interpreted as the block being not a leaf + // and it will be removed while removing another route from the same block but to a + // different leaf. + let mut remove_route = |route: TreeRoute| { + route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash)); + }; + + let target_hash = target.freshest_route.common_block().hash; + debug_assert_eq!( + target.freshest_route.common_block().number, + number, + "This is a bug in LevelMonitor::find_target() or the Backend is corrupted" + ); + + // Remove freshest (cached) route first. + remove_route(target.freshest_route); + + // Don't bother trying with leaves we already found to not be our descendants. + let to_skip = leaves.len() - target.freshest_leaf_idx; + leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| { + if invalidated_leaves.contains(&leaf_idx) { + return + } + match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) { + Ok(route) if route.retracted().is_empty() => { + invalidated_leaves.insert(leaf_idx); + remove_route(route); + }, + Err(err) => { + log::warn!( + target: "parachain", + "(Removal) unable getting route from {:?} to {:?}: {}", + target_hash, leaf_hash, err, + ); + }, + _ => (), + }; + }); + + remove_leaf(number, target_hash); + } + + /// Add a new imported block information to the monitor. + pub fn block_imported(&mut self, number: NumberFor, hash: Block::Hash) { + self.import_counter += One::one(); + self.freshness.insert(hash, self.import_counter); + self.levels.entry(number).or_default().insert(hash); + + // Do cleanup once in a while, we are allowed to have some obsolete information. + let finalized_num = self.backend.blockchain().info().finalized_number; + let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into(); + if delta >= CLEANUP_THRESHOLD { + for i in 0..delta { + let number = self.lowest_level + i.unique_saturated_into(); + self.levels.remove(&number).map(|level| { + level.iter().for_each(|hash| { + self.freshness.remove(hash); + }) + }); + } + + self.lowest_level = finalized_num; + } + } +} diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs new file mode 100644 index 00000000..86a781ad --- /dev/null +++ b/cumulus/client/consensus/common/src/lib.rs @@ -0,0 +1,174 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use polkadot_primitives::{Hash as PHash, PersistedValidationData}; + +use sc_client_api::Backend; +use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; + +use std::sync::Arc; + +mod level_monitor; +mod parachain_consensus; +#[cfg(test)] +mod tests; + +pub use parachain_consensus::run_parachain_consensus; + +use level_monitor::LevelMonitor; +pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT}; + +/// The result of [`ParachainConsensus::produce_candidate`]. +pub struct ParachainCandidate { + /// The block that was built for this candidate. + pub block: B, + /// The proof that was recorded while building the block. + pub proof: sp_trie::StorageProof, +} + +/// A specific parachain consensus implementation that can be used by a collator to produce candidates. +/// +/// The collator will call [`Self::produce_candidate`] every time there is a free core for the parachain +/// this collator is collating for. It is the job of the consensus implementation to decide if this +/// specific collator should build a candidate for the given relay chain block. The consensus +/// implementation could, for example, check whether this specific collator is part of a staked set. +#[async_trait::async_trait] +pub trait ParachainConsensus: Send + Sync + dyn_clone::DynClone { + /// Produce a new candidate at the given parent block and relay-parent blocks. + /// + /// Should return `None` if the consensus implementation decided that it shouldn't build a + /// candidate or if there occurred any error. + /// + /// # NOTE + /// + /// It is expected that the block is already imported when the future resolves. + async fn produce_candidate( + &mut self, + parent: &B::Header, + relay_parent: PHash, + validation_data: &PersistedValidationData, + ) -> Option>; +} + +dyn_clone::clone_trait_object!( ParachainConsensus where B: BlockT); + +#[async_trait::async_trait] +impl ParachainConsensus for Box + Send + Sync> { + async fn produce_candidate( + &mut self, + parent: &B::Header, + relay_parent: PHash, + validation_data: &PersistedValidationData, + ) -> Option> { + (*self).produce_candidate(parent, relay_parent, validation_data).await + } +} + +/// Parachain specific block import. +/// +/// This is used to set `block_import_params.fork_choice` to `false` as long as the block origin is +/// not `NetworkInitialSync`. The best block for parachains is determined by the relay chain. Meaning +/// we will update the best block, as it is included by the relay-chain. +pub struct ParachainBlockImport { + inner: BI, + monitor: Option>>, +} + +impl> ParachainBlockImport { + /// Create a new instance. + /// + /// The number of leaves per level limit is set to `LevelLimit::Default`. + pub fn new(inner: BI, backend: Arc) -> Self { + Self::new_with_limit(inner, backend, LevelLimit::Default) + } + + /// Create a new instance with an explicit limit to the number of leaves per level. + /// + /// This function alone doesn't enforce the limit on levels for old imported blocks, + /// the limit is eventually enforced only when new blocks are imported. + pub fn new_with_limit(inner: BI, backend: Arc, level_leaves_max: LevelLimit) -> Self { + let level_limit = match level_leaves_max { + LevelLimit::None => None, + LevelLimit::Some(limit) => Some(limit), + LevelLimit::Default => Some(MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT), + }; + + let monitor = + level_limit.map(|level_limit| SharedData::new(LevelMonitor::new(level_limit, backend))); + + Self { inner, monitor } + } +} + +impl Clone for ParachainBlockImport { + fn clone(&self) -> Self { + ParachainBlockImport { inner: self.inner.clone(), monitor: self.monitor.clone() } + } +} + +#[async_trait::async_trait] +impl BlockImport for ParachainBlockImport +where + Block: BlockT, + BI: BlockImport + Send, + BE: Backend, +{ + type Error = BI::Error; + type Transaction = BI::Transaction; + + async fn check_block( + &mut self, + block: sc_consensus::BlockCheckParams, + ) -> Result { + self.inner.check_block(block).await + } + + async fn import_block( + &mut self, + mut params: sc_consensus::BlockImportParams, + ) -> Result { + // Blocks are stored within the backend by using POST hash. + let hash = params.post_hash(); + let number = *params.header.number(); + + // Best block is determined by the relay chain, or if we are doing the initial sync + // we import all blocks as new best. + params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom( + params.origin == sp_consensus::BlockOrigin::NetworkInitialSync, + )); + + let maybe_lock = self.monitor.as_ref().map(|monitor_lock| { + let mut monitor = monitor_lock.shared_data_locked(); + monitor.enforce_limit(number); + monitor.release_mutex() + }); + + let res = self.inner.import_block(params).await?; + + if let (Some(mut monitor_lock), ImportResult::Imported(_)) = (maybe_lock, &res) { + let mut monitor = monitor_lock.upgrade(); + monitor.block_imported(number, hash); + } + + Ok(res) + } +} + +/// Marker trait denoting a block import type that fits the parachain requirements. +pub trait ParachainBlockImportMarker {} + +impl ParachainBlockImportMarker for ParachainBlockImport {} diff --git a/cumulus/client/consensus/common/src/parachain_consensus.rs b/cumulus/client/consensus/common/src/parachain_consensus.rs new file mode 100644 index 00000000..734f682d --- /dev/null +++ b/cumulus/client/consensus/common/src/parachain_consensus.rs @@ -0,0 +1,507 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use sc_client_api::{ + Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, +}; +use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; +use schnellru::{ByLength, LruMap}; +use sp_blockchain::Error as ClientError; +use sp_consensus::{BlockOrigin, BlockStatus}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; + +use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest}; +use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; + +use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; + +use codec::Decode; +use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt}; + +use std::sync::Arc; + +const LOG_TARGET: &str = "cumulus-consensus"; +const FINALIZATION_CACHE_SIZE: u32 = 40; + +fn handle_new_finalized_head( + parachain: &Arc

, + finalized_head: Vec, + last_seen_finalized_hashes: &mut LruMap, +) where + Block: BlockT, + B: Backend, + P: Finalizer + UsageProvider + BlockchainEvents, +{ + let header = match Block::Header::decode(&mut &finalized_head[..]) { + Ok(header) => header, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + error = ?err, + "Could not decode parachain header while following finalized heads.", + ); + return + }, + }; + + let hash = header.hash(); + + last_seen_finalized_hashes.insert(hash, ()); + + // Only finalize if we are below the incoming finalized parachain head + if parachain.usage_info().chain.finalized_number < *header.number() { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Attempting to finalize header.", + ); + if let Err(e) = parachain.finalize_block(hash, None, true) { + match e { + ClientError::UnknownBlock(_) => tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Could not finalize block because it is unknown.", + ), + _ => tracing::warn!( + target: LOG_TARGET, + error = ?e, + block_hash = ?hash, + "Failed to finalize block", + ), + } + } + } +} + +/// Follow the finalized head of the given parachain. +/// +/// For every finalized block of the relay chain, it will get the included parachain header +/// corresponding to `para_id` and will finalize it in the parachain. +async fn follow_finalized_head(para_id: ParaId, parachain: Arc

, relay_chain: R) +where + Block: BlockT, + P: Finalizer + UsageProvider + BlockchainEvents, + R: RelayChainInterface + Clone, + B: Backend, +{ + let finalized_heads = match finalized_heads(relay_chain, para_id).await { + Ok(finalized_heads_stream) => finalized_heads_stream.fuse(), + Err(err) => { + tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); + return + }, + }; + + let mut imported_blocks = parachain.import_notification_stream().fuse(); + + pin_mut!(finalized_heads); + + // We use this cache to finalize blocks that are imported late. + // For example, a block that has been recovered via PoV-Recovery + // on a full node can have several minutes delay. With this cache + // we have some "memory" of recently finalized blocks. + let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE)); + + loop { + select! { + fin = finalized_heads.next() => { + match fin { + Some(finalized_head) => + handle_new_finalized_head(¶chain, finalized_head, &mut last_seen_finalized_hashes), + None => { + tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); + return + } + } + }, + imported = imported_blocks.next() => { + match imported { + Some(imported_block) => { + // When we see a block import that is already finalized, we immediately finalize it. + if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?imported_block.hash, + "Setting newly imported block as finalized.", + ); + + if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) { + match e { + ClientError::UnknownBlock(_) => tracing::debug!( + target: LOG_TARGET, + block_hash = ?imported_block.hash, + "Could not finalize block because it is unknown.", + ), + _ => tracing::warn!( + target: LOG_TARGET, + error = ?e, + block_hash = ?imported_block.hash, + "Failed to finalize block", + ), + } + } + } + }, + None => { + tracing::debug!( + target: LOG_TARGET, + "Stopping following imported blocks.", + ); + return + } + } + } + } + } +} + +/// Run the parachain consensus. +/// +/// This will follow the given `relay_chain` to act as consensus for the parachain that corresponds +/// to the given `para_id`. It will set the new best block of the parachain as it gets aware of it. +/// The same happens for the finalized block. +/// +/// # Note +/// +/// This will access the backend of the parachain and thus, this future should be spawned as blocking +/// task. +pub async fn run_parachain_consensus( + para_id: ParaId, + parachain: Arc

, + relay_chain: R, + announce_block: Arc>) + Send + Sync>, + recovery_chan_tx: Option>>, +) where + Block: BlockT, + P: Finalizer + + UsageProvider + + Send + + Sync + + BlockBackend + + BlockchainEvents, + for<'a> &'a P: BlockImport, + R: RelayChainInterface + Clone, + B: Backend, +{ + let follow_new_best = follow_new_best( + para_id, + parachain.clone(), + relay_chain.clone(), + announce_block, + recovery_chan_tx, + ); + let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain); + select! { + _ = follow_new_best.fuse() => {}, + _ = follow_finalized_head.fuse() => {}, + } +} + +/// Follow the relay chain new best head, to update the Parachain new best head. +async fn follow_new_best( + para_id: ParaId, + parachain: Arc

, + relay_chain: R, + announce_block: Arc>) + Send + Sync>, + mut recovery_chan_tx: Option>>, +) where + Block: BlockT, + P: Finalizer + + UsageProvider + + Send + + Sync + + BlockBackend + + BlockchainEvents, + for<'a> &'a P: BlockImport, + R: RelayChainInterface + Clone, + B: Backend, +{ + let new_best_heads = match new_best_heads(relay_chain, para_id).await { + Ok(best_heads_stream) => best_heads_stream.fuse(), + Err(err) => { + tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream."); + return + }, + }; + + pin_mut!(new_best_heads); + + let mut imported_blocks = parachain.import_notification_stream().fuse(); + // The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain + // block before the associated parachain block. In this case we need to wait for this block to + // be imported to set it as new best. + let mut unset_best_header = None; + + loop { + select! { + h = new_best_heads.next() => { + match h { + Some(h) => handle_new_best_parachain_head( + h, + &*parachain, + &mut unset_best_header, + recovery_chan_tx.as_mut(), + ).await, + None => { + tracing::debug!( + target: LOG_TARGET, + "Stopping following new best.", + ); + return + } + } + }, + i = imported_blocks.next() => { + match i { + Some(i) => handle_new_block_imported( + i, + &mut unset_best_header, + &*parachain, + &*announce_block, + ).await, + None => { + tracing::debug!( + target: LOG_TARGET, + "Stopping following imported blocks.", + ); + return + } + } + }, + } + } +} + +/// Handle a new import block of the parachain. +async fn handle_new_block_imported( + notification: BlockImportNotification, + unset_best_header_opt: &mut Option, + parachain: &P, + announce_block: &(dyn Fn(Block::Hash, Option>) + Send + Sync), +) where + Block: BlockT, + P: UsageProvider + Send + Sync + BlockBackend, + for<'a> &'a P: BlockImport, +{ + // HACK + // + // Remove after https://github.com/paritytech/substrate/pull/8052 or similar is merged + if notification.origin != BlockOrigin::Own { + announce_block(notification.hash, None); + } + + let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) { + // If this is the new best block or we don't have any unset block, we can end it here. + (true, _) | (_, None) => return, + (false, Some(ref u)) => u, + }; + + let unset_hash = if notification.header.number() < unset_best_header.number() { + return + } else if notification.header.number() == unset_best_header.number() { + let unset_hash = unset_best_header.hash(); + + if unset_hash != notification.hash { + return + } else { + unset_hash + } + } else { + unset_best_header.hash() + }; + + match parachain.block_status(unset_hash) { + Ok(BlockStatus::InChainWithState) => { + drop(unset_best_header); + let unset_best_header = unset_best_header_opt + .take() + .expect("We checked above that the value is set; qed"); + tracing::debug!( + target: LOG_TARGET, + ?unset_hash, + "Importing block as new best for parachain.", + ); + import_block_as_new_best(unset_hash, unset_best_header, parachain).await; + }, + state => tracing::debug!( + target: LOG_TARGET, + ?unset_best_header, + ?notification.header, + ?state, + "Unexpected state for unset best header.", + ), + } +} + +/// Handle the new best parachain head as extracted from the new best relay chain. +async fn handle_new_best_parachain_head( + head: Vec, + parachain: &P, + unset_best_header: &mut Option, + mut recovery_chan_tx: Option<&mut Sender>>, +) where + Block: BlockT, + P: UsageProvider + Send + Sync + BlockBackend, + for<'a> &'a P: BlockImport, +{ + let parachain_head = match <::Header>::decode(&mut &head[..]) { + Ok(header) => header, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + error = ?err, + "Could not decode Parachain header while following best heads.", + ); + return + }, + }; + + let hash = parachain_head.hash(); + + if parachain.usage_info().chain.best_hash == hash { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Skipping set new best block, because block is already the best.", + ) + } else { + // Make sure the block is already known or otherwise we skip setting new best. + match parachain.block_status(hash) { + Ok(BlockStatus::InChainWithState) => { + unset_best_header.take(); + tracing::debug!( + target: LOG_TARGET, + ?hash, + "Importing block as new best for parachain.", + ); + import_block_as_new_best(hash, parachain_head, parachain).await; + }, + Ok(BlockStatus::InChainPruned) => { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + "Trying to set pruned block as new best!", + ); + }, + Ok(BlockStatus::Unknown) => { + *unset_best_header = Some(parachain_head); + + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Parachain block not yet imported, waiting for import to enact as best block.", + ); + + if let Some(ref mut recovery_chan_tx) = recovery_chan_tx { + // Best effort channel to actively encourage block recovery. + // An error here is not fatal; the relay chain continuously re-announces + // the best block, thus we will have other opportunities to retry. + let req = RecoveryRequest { hash, kind: RecoveryKind::Full }; + if let Err(err) = recovery_chan_tx.try_send(req) { + tracing::warn!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?err, + "Unable to notify block recovery subsystem" + ) + } + } + }, + Err(e) => { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?e, + "Failed to get block status of block.", + ); + }, + _ => {}, + } + } +} + +async fn import_block_as_new_best(hash: Block::Hash, header: Block::Header, parachain: &P) +where + Block: BlockT, + P: UsageProvider + Send + Sync + BlockBackend, + for<'a> &'a P: BlockImport, +{ + let best_number = parachain.usage_info().chain.best_number; + if *header.number() < best_number { + tracing::debug!( + target: LOG_TARGET, + %best_number, + block_number = %header.number(), + "Skipping importing block as new best block, because there already exists a \ + best block with an higher number", + ); + return + } + + // Make it the new best block + let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header); + block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true)); + block_import_params.import_existing = true; + + if let Err(err) = (&*parachain).import_block(block_import_params).await { + tracing::warn!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?err, + "Failed to set new best block.", + ); + } +} + +/// Returns a stream that will yield best heads for the given `para_id`. +async fn new_best_heads( + relay_chain: impl RelayChainInterface + Clone, + para_id: ParaId, +) -> RelayChainResult>> { + let new_best_notification_stream = + relay_chain.new_best_notification_stream().await?.filter_map(move |n| { + let relay_chain = relay_chain.clone(); + async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() } + }); + + Ok(new_best_notification_stream) +} + +/// Returns a stream that will yield finalized heads for the given `para_id`. +async fn finalized_heads( + relay_chain: impl RelayChainInterface + Clone, + para_id: ParaId, +) -> RelayChainResult>> { + let finality_notification_stream = + relay_chain.finality_notification_stream().await?.filter_map(move |n| { + let relay_chain = relay_chain.clone(); + async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() } + }); + + Ok(finality_notification_stream) +} + +/// Returns head of the parachain at the given relay chain block. +async fn parachain_head_at( + relay_chain: &impl RelayChainInterface, + at: PHash, + para_id: ParaId, +) -> RelayChainResult>> { + relay_chain + .persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut) + .await + .map(|s| s.map(|s| s.parent_head.0)) +} diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs new file mode 100644 index 00000000..23516d96 --- /dev/null +++ b/cumulus/client/consensus/common/src/tests.rs @@ -0,0 +1,801 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use crate::*; + +use async_trait::async_trait; +use codec::Encode; +use cumulus_client_pov_recovery::RecoveryKind; +use cumulus_primitives_core::{InboundDownwardMessage, InboundHrmpMessage}; +use cumulus_relay_chain_interface::{ + CommittedCandidateReceipt, OccupiedCoreAssumption, OverseerHandle, PHeader, ParaId, + RelayChainInterface, RelayChainResult, SessionIndex, StorageValue, ValidatorId, +}; +use cumulus_test_client::{ + runtime::{Block, Hash, Header}, + Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt, +}; +use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt}; +use futures_timer::Delay; +use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider}; +use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; +use sp_consensus::{BlockOrigin, BlockStatus}; +use std::{ + collections::{BTreeMap, HashMap}, + pin::Pin, + sync::{Arc, Mutex}, + time::Duration, +}; + +struct RelaychainInner { + new_best_heads: Option>, + finalized_heads: Option>, + new_best_heads_sender: mpsc::UnboundedSender

, + finalized_heads_sender: mpsc::UnboundedSender
, + relay_chain_hash_to_header: HashMap, +} + +impl RelaychainInner { + fn new() -> Self { + let (new_best_heads_sender, new_best_heads) = mpsc::unbounded(); + let (finalized_heads_sender, finalized_heads) = mpsc::unbounded(); + + Self { + new_best_heads_sender, + finalized_heads_sender, + new_best_heads: Some(new_best_heads), + finalized_heads: Some(finalized_heads), + relay_chain_hash_to_header: Default::default(), + } + } +} + +#[derive(Clone)] +struct Relaychain { + inner: Arc>, +} + +impl Relaychain { + fn new() -> Self { + Self { inner: Arc::new(Mutex::new(RelaychainInner::new())) } + } +} + +#[async_trait] +impl RelayChainInterface for Relaychain { + async fn validators(&self, _: PHash) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn best_block_hash(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn finalized_block_hash(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn retrieve_dmq_contents( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn retrieve_all_inbound_hrmp_channel_contents( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test") + } + + async fn persisted_validation_data( + &self, + hash: PHash, + _: ParaId, + _: OccupiedCoreAssumption, + ) -> RelayChainResult> { + Ok(Some(PersistedValidationData { + parent_head: self + .inner + .lock() + .unwrap() + .relay_chain_hash_to_header + .get(&hash) + .unwrap() + .encode() + .into(), + ..Default::default() + })) + } + + async fn candidate_pending_availability( + &self, + _: PHash, + _: ParaId, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn session_index_for_child(&self, _: PHash) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + unimplemented!("Not needed for test") + } + + async fn finality_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let inner = self.inner.clone(); + Ok(self + .inner + .lock() + .unwrap() + .finalized_heads + .take() + .unwrap() + .map(move |h| { + // Let's abuse the "parachain header" directly as relay chain header. + inner.lock().unwrap().relay_chain_hash_to_header.insert(h.hash(), h.clone()); + h + }) + .boxed()) + } + + async fn is_major_syncing(&self) -> RelayChainResult { + Ok(false) + } + + fn overseer_handle(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn get_storage_by_key( + &self, + _: PHash, + _: &[u8], + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn prove_read( + &self, + _: PHash, + _: &Vec>, + ) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn wait_for_block(&self, _: PHash) -> RelayChainResult<()> { + unimplemented!("Not needed for test") + } + + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let inner = self.inner.clone(); + Ok(self + .inner + .lock() + .unwrap() + .new_best_heads + .take() + .unwrap() + .map(move |h| { + // Let's abuse the "parachain header" directly as relay chain header. + inner.lock().unwrap().relay_chain_hash_to_header.insert(h.hash(), h.clone()); + h + }) + .boxed()) + } +} + +fn build_block( + builder: &B, + at: Option, + timestamp: Option, +) -> Block { + let builder = match at { + Some(at) => match timestamp { + Some(ts) => builder.init_block_builder_with_timestamp(at, None, Default::default(), ts), + None => builder.init_block_builder_at(at, None, Default::default()), + }, + None => builder.init_block_builder(None, Default::default()), + }; + + let mut block = builder.build().unwrap().block; + + // Simulate some form of post activity (like a Seal or Other generic things). + // This is mostly used to exercise the `LevelMonitor` correct behavior. + // (in practice we want that header post-hash != pre-hash) + block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3])); + + block +} + +async fn import_block>( + importer: &mut I, + block: Block, + origin: BlockOrigin, + import_as_best: bool, +) { + let (mut header, body) = block.deconstruct(); + + let post_digest = + header.digest.pop().expect("post digested is present in manually crafted block"); + + let mut block_import_params = BlockImportParams::new(origin, header); + block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best)); + block_import_params.body = Some(body); + block_import_params.post_digests.push(post_digest); + + importer.import_block(block_import_params).await.unwrap(); +} + +fn import_block_sync>( + importer: &mut I, + block: Block, + origin: BlockOrigin, + import_as_best: bool, +) { + block_on(import_block(importer, block, origin, import_as_best)); +} + +fn build_and_import_block_ext>( + builder: &B, + origin: BlockOrigin, + import_as_best: bool, + importer: &mut I, + at: Option, + timestamp: Option, +) -> Block { + let block = build_block(builder, at, timestamp); + import_block_sync(importer, block.clone(), origin, import_as_best); + block +} + +fn build_and_import_block(mut client: Arc, import_as_best: bool) -> Block { + build_and_import_block_ext( + &*client.clone(), + BlockOrigin::Own, + import_as_best, + &mut client, + None, + None, + ) +} + +#[test] +fn follow_new_best_works() { + sp_tracing::try_init_simple(); + + let client = Arc::new(TestClientBuilder::default().build()); + + let block = build_and_import_block(client.clone(), false); + let relay_chain = Relaychain::new(); + let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); + + let work = async move { + new_best_heads_sender.unbounded_send(block.header().clone()).unwrap(); + loop { + Delay::new(Duration::from_millis(100)).await; + if block.hash() == client.usage_info().chain.best_hash { + break + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); +} + +#[test] +fn follow_new_best_with_dummy_recovery_works() { + sp_tracing::try_init_simple(); + + let client = Arc::new(TestClientBuilder::default().build()); + + let relay_chain = Relaychain::new(); + let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone(); + + let (recovery_chan_tx, mut recovery_chan_rx) = futures::channel::mpsc::channel(3); + + let consensus = run_parachain_consensus( + 100.into(), + client.clone(), + relay_chain, + Arc::new(|_, _| {}), + Some(recovery_chan_tx), + ); + + let block = build_block(&*client.clone(), None, None); + let block_clone = block.clone(); + let client_clone = client.clone(); + + let work = async move { + new_best_heads_sender.unbounded_send(block.header().clone()).unwrap(); + loop { + Delay::new(Duration::from_millis(100)).await; + match client.block_status(block.hash()).unwrap() { + BlockStatus::Unknown => {}, + status => { + assert_eq!(block.hash(), client.usage_info().chain.best_hash); + assert_eq!(status, BlockStatus::InChainWithState); + break + }, + } + } + }; + + let dummy_block_recovery = async move { + loop { + if let Some(req) = recovery_chan_rx.next().await { + assert_eq!(req.hash, block_clone.hash()); + assert_eq!(req.kind, RecoveryKind::Full); + Delay::new(Duration::from_millis(500)).await; + import_block(&mut &*client_clone, block_clone.clone(), BlockOrigin::Own, true) + .await; + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = dummy_block_recovery.fuse() => {}, + _ = work.fuse() => {}, + } + }); +} + +#[test] +fn follow_finalized_works() { + sp_tracing::try_init_simple(); + + let client = Arc::new(TestClientBuilder::default().build()); + + let block = build_and_import_block(client.clone(), false); + let relay_chain = Relaychain::new(); + let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); + + let work = async move { + finalized_sender.unbounded_send(block.header().clone()).unwrap(); + loop { + Delay::new(Duration::from_millis(100)).await; + if block.hash() == client.usage_info().chain.finalized_hash { + break + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); +} + +#[test] +fn follow_finalized_does_not_stop_on_unknown_block() { + sp_tracing::try_init_simple(); + + let client = Arc::new(TestClientBuilder::default().build()); + + let block = build_and_import_block(client.clone(), false); + + let unknown_block = { + let block_builder = client.init_block_builder_at(block.hash(), None, Default::default()); + block_builder.build().unwrap().block + }; + + let relay_chain = Relaychain::new(); + let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); + + let work = async move { + for _ in 0..3usize { + finalized_sender.unbounded_send(unknown_block.header().clone()).unwrap(); + + Delay::new(Duration::from_millis(100)).await; + } + + finalized_sender.unbounded_send(block.header().clone()).unwrap(); + loop { + Delay::new(Duration::from_millis(100)).await; + if block.hash() == client.usage_info().chain.finalized_hash { + break + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); +} + +// It can happen that we first import a relay chain block, while not yet having the parachain +// block imported that would be set to the best block. We need to make sure to import this +// block as new best block in the moment it is imported. +#[test] +fn follow_new_best_sets_best_after_it_is_imported() { + sp_tracing::try_init_simple(); + + let mut client = Arc::new(TestClientBuilder::default().build()); + + let block = build_and_import_block(client.clone(), false); + + let unknown_block = { + let block_builder = client.init_block_builder_at(block.hash(), None, Default::default()); + block_builder.build().unwrap().block + }; + + let relay_chain = Relaychain::new(); + let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); + + let work = async move { + new_best_heads_sender.unbounded_send(block.header().clone()).unwrap(); + + loop { + Delay::new(Duration::from_millis(100)).await; + if block.hash() == client.usage_info().chain.best_hash { + break + } + } + + // Announce the unknown block + new_best_heads_sender.unbounded_send(unknown_block.header().clone()).unwrap(); + + // Do some iterations. As this is a local task executor, only one task can run at a time. + // Meaning that it should already have processed the unknown block. + for _ in 0..3usize { + Delay::new(Duration::from_millis(100)).await; + } + + let (header, body) = unknown_block.clone().deconstruct(); + + let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header); + block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false)); + block_import_params.body = Some(body); + + // Now import the unkown block to make it "known" + client.import_block(block_import_params).await.unwrap(); + + loop { + Delay::new(Duration::from_millis(100)).await; + if unknown_block.hash() == client.usage_info().chain.best_hash { + break + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); +} + +/// When we import a new best relay chain block, we extract the best parachain block from it and set +/// it. This works when we follow the relay chain and parachain at the tip of each other, but there +/// can be race conditions when we are doing a full sync of both or just the relay chain. +/// The problem is that we import parachain blocks as best as long as we are in major sync. So, we +/// could import block 100 as best and then import a relay chain block that says that block 99 is +/// the best parachain block. This should not happen, we should never set the best block to a lower +/// block number. +#[test] +fn do_not_set_best_block_to_older_block() { + const NUM_BLOCKS: usize = 4; + + sp_tracing::try_init_simple(); + + let backend = Arc::new(Backend::new_test(1000, 1)); + + let client = Arc::new(TestClientBuilder::with_backend(backend).build()); + + let blocks = (0..NUM_BLOCKS) + .into_iter() + .map(|_| build_and_import_block(client.clone(), true)) + .collect::>(); + + assert_eq!(NUM_BLOCKS as u32, client.usage_info().chain.best_number); + + let relay_chain = Relaychain::new(); + let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); + + let client2 = client.clone(); + let work = async move { + new_best_heads_sender + .unbounded_send(blocks[NUM_BLOCKS - 2].header().clone()) + .unwrap(); + // Wait for it to be processed. + Delay::new(Duration::from_millis(300)).await; + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); + + // Build and import a new best block. + build_and_import_block(client2.clone(), true); +} + +#[test] +fn prune_blocks_on_level_overflow() { + // Here we are using the timestamp value to generate blocks with different hashes. + const LEVEL_LIMIT: usize = 3; + const TIMESTAMP_MULTIPLIER: u64 = 60000; + + let backend = Arc::new(Backend::new_test(1000, 3)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let mut para_import = ParachainBlockImport::new_with_limit( + client.clone(), + backend.clone(), + LevelLimit::Some(LEVEL_LIMIT), + ); + + let block0 = build_and_import_block_ext( + &*client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + ); + let id0 = block0.header.hash(); + + let blocks1 = (0..LEVEL_LIMIT) + .into_iter() + .map(|i| { + build_and_import_block_ext( + &*client, + if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own }, + i == 1, + &mut para_import, + Some(id0), + Some(i as u64 * TIMESTAMP_MULTIPLIER), + ) + }) + .collect::>(); + let id10 = blocks1[0].header.hash(); + + let blocks2 = (0..2) + .into_iter() + .map(|i| { + build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id10), + Some(i as u64 * TIMESTAMP_MULTIPLIER), + ) + }) + .collect::>(); + + // Initial scenario (with B11 imported as best) + // + // B0 --+-- B10 --+-- B20 + // +-- B11 +-- B21 + // +-- B12 + + let leaves = backend.blockchain().leaves().unwrap(); + let mut expected = vec![ + blocks2[0].header.hash(), + blocks2[1].header.hash(), + blocks1[1].header.hash(), + blocks1[2].header.hash(), + ]; + assert_eq!(leaves, expected); + let best = client.usage_info().chain.best_hash; + assert_eq!(best, blocks1[1].header.hash()); + + let block13 = build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id0), + Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER), + ); + + // Expected scenario + // + // B0 --+-- B10 --+-- B20 + // +-- B11 +-- B21 + // +--(B13) <-- B12 has been replaced + + let leaves = backend.blockchain().leaves().unwrap(); + expected[3] = block13.header.hash(); + assert_eq!(leaves, expected); + + let block14 = build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id0), + Some(2 * LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER), + ); + + // Expected scenario + // + // B0 --+--(B14) <-- B10 has been replaced + // +-- B11 + // +--(B13) + + let leaves = backend.blockchain().leaves().unwrap(); + expected.remove(0); + expected.remove(0); + expected.push(block14.header.hash()); + assert_eq!(leaves, expected); +} + +#[test] +fn restore_limit_monitor() { + // Here we are using the timestamp value to generate blocks with different hashes. + const LEVEL_LIMIT: usize = 2; + const TIMESTAMP_MULTIPLIER: u64 = 60000; + + let backend = Arc::new(Backend::new_test(1000, 3)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + + // Start with a block import not enforcing any limit... + let mut para_import = ParachainBlockImport::new_with_limit( + client.clone(), + backend.clone(), + LevelLimit::Some(usize::MAX), + ); + + let block00 = build_and_import_block_ext( + &*client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + ); + let id00 = block00.header.hash(); + + let blocks1 = (0..LEVEL_LIMIT + 1) + .into_iter() + .map(|i| { + build_and_import_block_ext( + &*client, + if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own }, + i == 1, + &mut para_import, + Some(id00), + Some(i as u64 * TIMESTAMP_MULTIPLIER), + ) + }) + .collect::>(); + let id10 = blocks1[0].header.hash(); + + let _ = (0..LEVEL_LIMIT) + .into_iter() + .map(|i| { + build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id10), + Some(i as u64 * TIMESTAMP_MULTIPLIER), + ) + }) + .collect::>(); + + // Scenario before limit application (with B11 imported as best) + // Import order (freshess): B00, B10, B11, B12, B20, B21 + // + // B00 --+-- B10 --+-- B20 + // | +-- B21 + // +-- B11 + // | + // +-- B12 + + // Simulate a restart by forcing a new monitor structure instance + + let mut para_import = ParachainBlockImport::new_with_limit( + client.clone(), + backend.clone(), + LevelLimit::Some(LEVEL_LIMIT), + ); + + let monitor_sd = para_import.monitor.clone().unwrap(); + + let monitor = monitor_sd.shared_data(); + assert_eq!(monitor.import_counter, 3); + std::mem::drop(monitor); + + let block13 = build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id00), + Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER), + ); + + // Expected scenario + // + // B0 --+-- B11 + // +--(B13) + + let leaves = backend.blockchain().leaves().unwrap(); + let expected = vec![blocks1[1].header.hash(), block13.header.hash()]; + assert_eq!(leaves, expected); + + let monitor = monitor_sd.shared_data(); + assert_eq!(monitor.import_counter, 4); + assert!(monitor.levels.iter().all(|(number, hashes)| { + hashes + .iter() + .filter(|hash| **hash != block13.header.hash()) + .all(|hash| *number == *monitor.freshness.get(hash).unwrap()) + })); + assert_eq!(*monitor.freshness.get(&block13.header.hash()).unwrap(), monitor.import_counter); +}