Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Manual seal delayed finalize #13999

Merged
merged 14 commits into from
May 2, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/manual-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ assert_matches = "1.3.0"
async-trait = "0.1.57"
codec = { package = "parity-scale-codec", version = "3.2.2" }
futures = "0.3.21"
futures-timer = "3.0.1"
log = "0.4.17"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
Expand Down
158 changes: 154 additions & 4 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
//! This is suitable for a testing environment.

use futures::prelude::*;
use futures_timer::Delay;
use prometheus_endpoint::Registry;
use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
use sc_client_api::{
backend::{Backend as ClientBackend, Finalizer},
client::BlockchainEvents,
};
use sc_consensus::{
block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
import_queue::{BasicQueue, BoxBlockImport, Verifier},
};
use sp_blockchain::HeaderBackend;
use sp_consensus::{Environment, Proposer, SelectChain};
use sp_core::traits::SpawnNamed;
use sp_inherents::CreateInherentDataProviders;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::Arc, time::Duration};

mod error;
mod finalize_block;
Expand Down Expand Up @@ -84,7 +89,7 @@ where

/// Params required to start the instant sealing authorship task.
pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CS, CIDP, P> {
/// Block import instance for well. importing blocks.
/// Block import instance.
pub block_import: BI,

/// The environment we are producing blocks for.
Expand Down Expand Up @@ -136,7 +141,19 @@ pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC,
pub create_inherent_data_providers: CIDP,
}

/// Creates the background authorship task for the manual seal engine.
/// Params required to start the delayed finalization task.
pub struct DelayedFinalizeParams<C, S> {
shunsukew marked this conversation as resolved.
Show resolved Hide resolved
/// Block import instance.
pub client: Arc<C>,

/// Handle for spawning delayed finalization tasks.
pub spawn_handle: S,
shunsukew marked this conversation as resolved.
Show resolved Hide resolved

/// The delay in seconds before a block is finalized.
pub delay_sec: u64,
}

/// Creates the background authorship task for the manually seal engine.
pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
ManualSealParams {
mut block_import,
Expand Down Expand Up @@ -303,6 +320,44 @@ pub async fn run_instant_seal_and_finalize<B, BI, CB, E, C, TP, SC, CIDP, P>(
.await
}

/// Creates a future for delayed finalization of manual sealed blocks.
shunsukew marked this conversation as resolved.
Show resolved Hide resolved
///
/// The future needs to be spawned in the background alongside the
/// [`run_manual_seal`]/[`run_instant_seal`] future. It is required that
/// [`EngineCommand::SealNewBlock`] is send with `finalize = false` to not finalize blocks directly
/// after building them. This also means that delayed finality can not be used with
/// [`run_instant_seal_and_finalize`].
pub async fn run_delayed_finalize<B, CB, C, S>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some docs on how to use it. A "real" example would be really nice, but at least saying that you don't set finalize in the command stream etc.

DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams<C, S>,
) where
B: BlockT + 'static,
CB: ClientBackend<B> + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
S: SpawnNamed,
{
let mut block_import_stream = client.import_notification_stream();

while let Some(notification) = block_import_stream.next().await {
let delay = Delay::new(Duration::from_secs(delay_sec));
let cloned_client = client.clone();
spawn_handle.spawn(
"delayed-finalize",
None,
Box::pin(async move {
delay.await;
finalize_block(FinalizeBlockParams {
hash: notification.hash,
sender: None,
justification: None,
finalizer: cloned_client,
_phantom: PhantomData,
})
.await
}),
);
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -428,6 +483,101 @@ mod tests {
assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1)
}

#[tokio::test]
async fn instant_seal_delayed_finalize() {
let builder = TestClientBuilder::new();
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
api(),
None,
RevalidationType::Full,
spawner.clone(),
0,
genesis_hash,
genesis_hash,
));
let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
// this test checks that blocks are created as soon as transactions are imported into the
// pool.
let (sender, receiver) = futures::channel::oneshot::channel();
let mut sender = Arc::new(Some(sender));
let commands_stream =
pool.pool().validated_pool().import_notification_stream().map(move |_| {
// we're only going to submit one tx so this fn will only be called once.
let mut_sender = Arc::get_mut(&mut sender).unwrap();
let sender = std::mem::take(mut_sender);
EngineCommand::SealNewBlock {
create_empty: false,
// set to `false`, expecting to be finalized by delayed finalize
finalize: false,
parent_hash: None,
sender,
}
});

let future_instant_seal = run_manual_seal(ManualSealParams {
block_import: client.clone(),
commands_stream,
env,
client: client.clone(),
pool: pool.clone(),
select_chain,
create_inherent_data_providers: |_, _| async { Ok(()) },
consensus_data_provider: None,
});
std::thread::spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
// spawn the background authorship task
rt.block_on(future_instant_seal);
});

let delay_sec = 5;
let future_delayed_finalize = run_delayed_finalize(DelayedFinalizeParams {
client: client.clone(),
delay_sec,
spawn_handle: spawner,
});
std::thread::spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
// spawn the background authorship task
rt.block_on(future_delayed_finalize);
});
shunsukew marked this conversation as resolved.
Show resolved Hide resolved

let mut finality_stream = client.finality_notification_stream();
// submit a transaction to pool.
let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
// assert that it was successfully imported
assert!(result.is_ok());
// assert that the background task returns ok
let created_block = receiver.await.unwrap().unwrap();
assert_eq!(
created_block,
CreatedBlock {
hash: created_block.hash,
aux: ImportedAux {
header_only: false,
clear_justification_requests: false,
needs_justification: false,
bad_justification: false,
is_new_best: true,
}
}
);
// assert that there's a new block in the db.
assert!(client.header(created_block.hash).unwrap().is_some());
assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1);

assert_eq!(client.info().finalized_hash, client.info().genesis_hash);

let finalized = finality_stream.select_next_some().await;
assert_eq!(finalized.hash, created_block.hash);
}

#[tokio::test]
async fn manual_seal_and_finalization() {
let builder = TestClientBuilder::new();
Expand Down
7 changes: 4 additions & 3 deletions client/consensus/manual-seal/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ pub fn send_result<T: std::fmt::Debug>(
}
}
} else {
// instant seal doesn't report errors over rpc, simply log them.
// Sealing/Finalization with no RPC sender such as instant seal or delayed finalize doesn't
// report errors over rpc, simply log them.
match result {
Ok(r) => log::info!("Instant Seal success: {:?}", r),
Err(e) => log::error!("Instant Seal encountered an error: {}", e),
Ok(r) => log::info!("Consensus with no RPC sender success: {:?}", r),
Err(e) => log::error!("Consensus with no RPC sender encountered an error: {}", e),
}
}
}