diff --git a/Cargo.lock b/Cargo.lock index 8d8d2d59a3c88..9c2a6e83eba37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8859,6 +8859,7 @@ dependencies = [ "assert_matches", "async-trait", "futures", + "futures-timer", "jsonrpsee", "log", "parity-scale-codec", diff --git a/client/consensus/manual-seal/Cargo.toml b/client/consensus/manual-seal/Cargo.toml index 19c4b22247e0f..2ddcf0d772890 100644 --- a/client/consensus/manual-seal/Cargo.toml +++ b/client/consensus/manual-seal/Cargo.toml @@ -18,6 +18,7 @@ assert_matches = "1.3.0" async-trait = "0.1.57" codec = { package = "parity-scale-codec", version = "3.2.2" } futures = "0.3.21" +futures-timer = "3.0.1" log = "0.4.17" serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index b277b34366577..03c9418b5c560 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -20,17 +20,22 @@ //! This is suitable for a testing environment. use futures::prelude::*; +use futures_timer::Delay; use prometheus_endpoint::Registry; -use sc_client_api::backend::{Backend as ClientBackend, Finalizer}; +use sc_client_api::{ + backend::{Backend as ClientBackend, Finalizer}, + client::BlockchainEvents, +}; use sc_consensus::{ block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy}, import_queue::{BasicQueue, BoxBlockImport, Verifier}, }; use sp_blockchain::HeaderBackend; use sp_consensus::{Environment, Proposer, SelectChain}; +use sp_core::traits::SpawnNamed; use sp_inherents::CreateInherentDataProviders; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; -use std::{marker::PhantomData, sync::Arc}; +use std::{marker::PhantomData, sync::Arc, time::Duration}; mod error; mod finalize_block; @@ -84,7 +89,7 @@ where /// Params required to start the instant sealing authorship task. pub struct ManualSealParams, TP, SC, CS, CIDP, P> { - /// Block import instance for well. importing blocks. + /// Block import instance. pub block_import: BI, /// The environment we are producing blocks for. @@ -136,7 +141,19 @@ pub struct InstantSealParams, TP, SC, pub create_inherent_data_providers: CIDP, } -/// Creates the background authorship task for the manual seal engine. +/// Params required to start the delayed finalization task. +pub struct DelayedFinalizeParams { + /// Block import instance. + pub client: Arc, + + /// Handle for spawning delayed finalization tasks. + pub spawn_handle: S, + + /// The delay in seconds before a block is finalized. + pub delay_sec: u64, +} + +/// Creates the background authorship task for the manually seal engine. pub async fn run_manual_seal( ManualSealParams { mut block_import, @@ -303,6 +320,44 @@ pub async fn run_instant_seal_and_finalize( .await } +/// Creates a future for delayed finalization of manual sealed blocks. +/// +/// The future needs to be spawned in the background alongside the +/// [`run_manual_seal`]/[`run_instant_seal`] future. It is required that +/// [`EngineCommand::SealNewBlock`] is send with `finalize = false` to not finalize blocks directly +/// after building them. This also means that delayed finality can not be used with +/// [`run_instant_seal_and_finalize`]. +pub async fn run_delayed_finalize( + DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams, +) where + B: BlockT + 'static, + CB: ClientBackend + 'static, + C: HeaderBackend + Finalizer + ProvideRuntimeApi + BlockchainEvents + 'static, + S: SpawnNamed, +{ + let mut block_import_stream = client.import_notification_stream(); + + while let Some(notification) = block_import_stream.next().await { + let delay = Delay::new(Duration::from_secs(delay_sec)); + let cloned_client = client.clone(); + spawn_handle.spawn( + "delayed-finalize", + None, + Box::pin(async move { + delay.await; + finalize_block(FinalizeBlockParams { + hash: notification.hash, + sender: None, + justification: None, + finalizer: cloned_client, + _phantom: PhantomData, + }) + .await + }), + ); + } +} + #[cfg(test)] mod tests { use super::*; @@ -428,6 +483,101 @@ mod tests { assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1) } + #[tokio::test] + async fn instant_seal_delayed_finalize() { + let builder = TestClientBuilder::new(); + let (client, select_chain) = builder.build_with_longest_chain(); + let client = Arc::new(client); + let spawner = sp_core::testing::TaskExecutor::new(); + let genesis_hash = client.info().genesis_hash; + let pool = Arc::new(BasicPool::with_revalidation_type( + Options::default(), + true.into(), + api(), + None, + RevalidationType::Full, + spawner.clone(), + 0, + genesis_hash, + genesis_hash, + )); + let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None); + // this test checks that blocks are created as soon as transactions are imported into the + // pool. + let (sender, receiver) = futures::channel::oneshot::channel(); + let mut sender = Arc::new(Some(sender)); + let commands_stream = + pool.pool().validated_pool().import_notification_stream().map(move |_| { + // we're only going to submit one tx so this fn will only be called once. + let mut_sender = Arc::get_mut(&mut sender).unwrap(); + let sender = std::mem::take(mut_sender); + EngineCommand::SealNewBlock { + create_empty: false, + // set to `false`, expecting to be finalized by delayed finalize + finalize: false, + parent_hash: None, + sender, + } + }); + + let future_instant_seal = run_manual_seal(ManualSealParams { + block_import: client.clone(), + commands_stream, + env, + client: client.clone(), + pool: pool.clone(), + select_chain, + create_inherent_data_providers: |_, _| async { Ok(()) }, + consensus_data_provider: None, + }); + std::thread::spawn(|| { + let rt = tokio::runtime::Runtime::new().unwrap(); + // spawn the background authorship task + rt.block_on(future_instant_seal); + }); + + let delay_sec = 5; + let future_delayed_finalize = run_delayed_finalize(DelayedFinalizeParams { + client: client.clone(), + delay_sec, + spawn_handle: spawner, + }); + std::thread::spawn(|| { + let rt = tokio::runtime::Runtime::new().unwrap(); + // spawn the background authorship task + rt.block_on(future_delayed_finalize); + }); + + let mut finality_stream = client.finality_notification_stream(); + // submit a transaction to pool. + let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await; + // assert that it was successfully imported + assert!(result.is_ok()); + // assert that the background task returns ok + let created_block = receiver.await.unwrap().unwrap(); + assert_eq!( + created_block, + CreatedBlock { + hash: created_block.hash, + aux: ImportedAux { + header_only: false, + clear_justification_requests: false, + needs_justification: false, + bad_justification: false, + is_new_best: true, + } + } + ); + // assert that there's a new block in the db. + assert!(client.header(created_block.hash).unwrap().is_some()); + assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1); + + assert_eq!(client.info().finalized_hash, client.info().genesis_hash); + + let finalized = finality_stream.select_next_some().await; + assert_eq!(finalized.hash, created_block.hash); + } + #[tokio::test] async fn manual_seal_and_finalization() { let builder = TestClientBuilder::new(); diff --git a/client/consensus/manual-seal/src/rpc.rs b/client/consensus/manual-seal/src/rpc.rs index db92b9fd2981a..85abcdc08574b 100644 --- a/client/consensus/manual-seal/src/rpc.rs +++ b/client/consensus/manual-seal/src/rpc.rs @@ -160,10 +160,11 @@ pub fn send_result( } } } else { - // instant seal doesn't report errors over rpc, simply log them. + // Sealing/Finalization with no RPC sender such as instant seal or delayed finalize doesn't + // report errors over rpc, simply log them. match result { - Ok(r) => log::info!("Instant Seal success: {:?}", r), - Err(e) => log::error!("Instant Seal encountered an error: {}", e), + Ok(r) => log::info!("Consensus with no RPC sender success: {:?}", r), + Err(e) => log::error!("Consensus with no RPC sender encountered an error: {}", e), } } }