Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

sc-consensus-beefy: graceful support for pallet-beefy reset #14217

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/consensus/beefy/src/aux_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_runtime::traits::Block as BlockT;
const VERSION_KEY: &[u8] = b"beefy_auxschema_version";
const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state";

const CURRENT_VERSION: u32 = 3;
const CURRENT_VERSION: u32 = 4;

pub(crate) fn write_current_version<BE: AuxStore>(backend: &BE) -> ClientResult<()> {
info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION);
Expand Down Expand Up @@ -63,8 +63,8 @@ where

match version {
None => (),
Some(1) | Some(2) => (), // versions 1 & 2 are obsolete and should be simply ignored
Some(3) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
Some(1) | Some(2) | Some(3) => (), // versions 1, 2 & 3 are obsolete and should be ignored
Some(4) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
acatangiu marked this conversation as resolved.
Show resolved Hide resolved
other =>
return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use codec::Decode;
use futures::{channel::oneshot, StreamExt};
use log::{debug, trace};
use log::{debug, error, trace};
use sc_client_api::BlockBackend;
use sc_network::{
config as netconfig, config::RequestResponseConfig, types::ProtocolName, PeerId,
Expand Down Expand Up @@ -215,5 +215,9 @@ where
},
}
}
error!(
target: crate::LOG_TARGET,
"🥩 On-demand requests receiver stream terminated, closing worker."
);
}
}
5 changes: 4 additions & 1 deletion client/consensus/beefy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! BEEFY gadget specific errors
//!
//! Used for BEEFY gadget interal error handling only
//! Used for BEEFY gadget internal error handling only

use std::fmt::Debug;

Expand All @@ -34,6 +34,8 @@ pub enum Error {
Signature(String),
#[error("Session uninitialized")]
UninitSession,
#[error("pallet-beefy was reset, please restart voter")]
ConsensusReset,
}

#[cfg(test)]
Expand All @@ -45,6 +47,7 @@ impl PartialEq for Error {
(Error::RuntimeApi(_), Error::RuntimeApi(_)) => true,
(Error::Signature(s1), Error::Signature(s2)) => s1 == s2,
(Error::UninitSession, Error::UninitSession) => true,
(Error::ConsensusReset, Error::ConsensusReset) => true,
_ => false,
}
}
Expand Down
86 changes: 49 additions & 37 deletions client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use sp_blockchain::{
use sp_consensus::{Error as ConsensusError, SyncOracle};
use sp_consensus_beefy::{
crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet, BEEFY_ENGINE_ID,
GENESIS_AUTHORITY_SET_ID,
};
use sp_keystore::KeystorePtr;
use sp_mmr_primitives::MmrApi;
Expand Down Expand Up @@ -282,8 +281,14 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let persisted_state =
match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications)
.await
.and_then(|best_grandpa| {
load_or_init_voter_state(&*backend, &*runtime, best_grandpa, min_block_delta)
.and_then(|(beefy_genesis, best_grandpa)| {
load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
}) {
Ok(state) => state,
Err(e) => {
Expand Down Expand Up @@ -316,16 +321,17 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
persisted_state,
};

futures::future::join(
worker.run(block_import_justif, finality_notifications),
on_demand_justifications_handler.run(),
futures::future::select(
Box::pin(worker.run(block_import_justif, finality_notifications)),
Box::pin(on_demand_justifications_handler.run()),
)
.await;
}

fn load_or_init_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
Expand All @@ -335,17 +341,22 @@ where
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
// Initialize voter state from AUX DB or from pallet genesis.
if let Some(mut state) = crate::aux_schema::load_persistent(backend)? {
// Overwrite persisted state with current best GRANDPA block.
state.set_best_grandpa(best_grandpa);
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
Ok(state)
} else {
initialize_voter_state(backend, runtime, best_grandpa, min_block_delta)
}
// Initialize voter state from AUX DB if compatible.
crate::aux_schema::load_persistent(backend)?
// Verify state pallet genesis matches runtime.
.filter(|state| state.pallet_genesis() == beefy_genesis)
.and_then(|mut state| {
// Overwrite persisted state with current best GRANDPA block.
state.set_best_grandpa(best_grandpa.clone());
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
Some(Ok(state))
})
// No valid voter-state persisted, re-initialize from pallet genesis.
.unwrap_or_else(|| {
initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta)
})
}

// If no persisted state present, walk back the chain from first GRANDPA notification to either:
Expand All @@ -355,6 +366,7 @@ where
fn initialize_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
Expand All @@ -369,6 +381,7 @@ where
.beefy_genesis(best_grandpa.hash())
.ok()
.flatten()
.filter(|genesis| *genesis == beefy_genesis)
.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?;
// Walk back the imported blocks and initialize voter either, at the last block with
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
Expand Down Expand Up @@ -396,16 +409,20 @@ where
rounds.conclude(best_beefy);
sessions.push_front(rounds);
}
let state =
PersistedState::checked_new(best_grandpa, best_beefy, sessions, min_block_delta)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
let state = PersistedState::checked_new(
best_grandpa,
best_beefy,
sessions,
min_block_delta,
beefy_genesis,
)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
break state
}

if *header.number() == beefy_genesis {
// We've reached BEEFY genesis, initialize voter here.
let genesis_set =
expect_validator_set(runtime, header.hash()).and_then(genesis_set_sanity_check)?;
let genesis_set = expect_validator_set(runtime, header.hash())?;
info!(
target: LOG_TARGET,
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
Expand All @@ -415,8 +432,14 @@ where
);

sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
break PersistedState::checked_new(best_grandpa, Zero::zero(), sessions, min_block_delta)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
break PersistedState::checked_new(
best_grandpa,
Zero::zero(),
sessions,
min_block_delta,
beefy_genesis,
)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
}

if let Some(active) = worker::find_authorities_change::<B>(&header) {
Expand Down Expand Up @@ -451,7 +474,7 @@ async fn wait_for_runtime_pallet<B, R>(
runtime: &R,
mut gossip_engine: &mut GossipEngine<B>,
finality: &mut Fuse<FinalityNotifications<B>>,
) -> ClientResult<<B as Block>::Header>
) -> ClientResult<(NumberFor<B>, <B as Block>::Header)>
where
B: Block,
R: ProvideRuntimeApi<B>,
Expand All @@ -474,7 +497,7 @@ where
"🥩 BEEFY pallet available: block {:?} beefy genesis {:?}",
notif.header.number(), start
);
return Ok(notif.header)
return Ok((start, notif.header))
}
}
},
Expand All @@ -488,17 +511,6 @@ where
Err(ClientError::Backend(err_msg))
}

fn genesis_set_sanity_check(
active: ValidatorSet<AuthorityId>,
) -> ClientResult<ValidatorSet<AuthorityId>> {
if active.id() == GENESIS_AUTHORITY_SET_ID {
Ok(active)
} else {
error!(target: LOG_TARGET, "🥩 Unexpected ID for genesis validator set {:?}.", active);
Err(ClientError::Backend("BEEFY Genesis sanity check failed.".into()))
}
}

fn expect_validator_set<B, R>(
runtime: &R,
at_hash: B::Hash,
Expand Down
44 changes: 35 additions & 9 deletions client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,9 @@ async fn voter_init_setup(
gossip_validator,
None,
);
let best_grandpa = wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap();
load_or_init_voter_state(&*backend, api, best_grandpa, 1)
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap();
load_or_init_voter_state(&*backend, api, beefy_genesis, best_grandpa, 1)
}

// Spawns beefy voters. Returns a future to spawn on the runtime.
Expand Down Expand Up @@ -981,9 +982,7 @@ async fn should_initialize_voter_at_genesis() {

// push 15 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await;

let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();

Expand Down Expand Up @@ -1022,11 +1021,9 @@ async fn should_initialize_voter_at_custom_genesis() {
let custom_pallet_genesis = 7;
let api = TestApi::new(custom_pallet_genesis, &validator_set, GOOD_MMR_ROOT);

// push 15 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await;

// push 15 blocks with `AuthorityChange` digests every 15 blocks
let hashes = net.generate_blocks_and_sync(15, 15, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();

// finalize 3, 5, 8 without justifications
net.peer(0).client().as_client().finalize_block(hashes[3], None).unwrap();
net.peer(0).client().as_client().finalize_block(hashes[5], None).unwrap();
Expand All @@ -1053,6 +1050,35 @@ async fn should_initialize_voter_at_custom_genesis() {
assert!(verify_persisted_version(&*backend));
let state = load_persistent(&*backend).unwrap().unwrap();
assert_eq!(state, persisted_state);

// now re-init after genesis changes

// should ignore existing aux db state and reinit at new genesis
let new_validator_set = ValidatorSet::new(make_beefy_ids(keys), 42).unwrap();
let new_pallet_genesis = 10;
let api = TestApi::new(new_pallet_genesis, &new_validator_set, GOOD_MMR_ROOT);

net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap();
// load persistent state - state preset in DB, but with different pallet genesis
let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();

// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
let sessions = new_persisted_state.voting_oracle().sessions();
assert_eq!(sessions.len(), 1);
assert_eq!(sessions[0].session_start(), new_pallet_genesis);
let rounds = new_persisted_state.active_round().unwrap();
assert_eq!(rounds.session_start(), new_pallet_genesis);
assert_eq!(rounds.validator_set_id(), new_validator_set.id());

// verify next vote target is mandatory block 10
assert_eq!(new_persisted_state.best_beefy_block(), 0);
assert_eq!(new_persisted_state.best_grandpa_number(), 10);
assert_eq!(new_persisted_state.voting_oracle().voting_target(), Some(new_pallet_genesis));

// verify state also saved to db
assert!(verify_persisted_version(&*backend));
let state = load_persistent(&*backend).unwrap().unwrap();
assert_eq!(state, new_persisted_state);
}

#[tokio::test]
Expand Down Expand Up @@ -1166,7 +1192,7 @@ async fn beefy_finalizing_after_pallet_genesis() {
sp_tracing::try_init_simple();

let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob];
let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap();
let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 14).unwrap();
let session_len = 10;
let min_block_delta = 1;
let pallet_genesis = 15;
Expand Down
Loading