Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statement-distribution: fix filtering of statements for elastic parachains #3879

Merged
merged 19 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions polkadot/node/collation-generation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
#[error(transparent)]
UtilRuntime(#[from] polkadot_node_subsystem_util::runtime::Error),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error("Parachain backing state not available in runtime.")]
MissingParaBackingState,
Expand Down
54 changes: 10 additions & 44 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,23 @@ use polkadot_node_primitives::{
SubmitCollationParams,
};
use polkadot_node_subsystem::{
messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest},
messages::{CollationGenerationMessage, CollatorProtocolMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_para_backing_state, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
request_async_backing_params, request_availability_cores, request_para_backing_state,
request_persisted_validation_data, request_validation_code, request_validation_code_hash,
request_validators,
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core},
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScheduledCore, ValidationCodeHash,
PersistedValidationData, ValidationCodeHash,
};
use sp_core::crypto::Pair;
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
use std::sync::Arc;

mod error;

Expand Down Expand Up @@ -228,7 +226,9 @@ async fn handle_new_activations<Context>(
let availability_cores = availability_cores??;
let async_backing_params = async_backing_params?.ok();
let n_validators = validators??.len();
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent)
.await
.map_err(crate::error::Error::UtilRuntime)?;

// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();
Expand Down Expand Up @@ -655,37 +655,3 @@ fn erasure_root(
let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
Ok(polkadot_erasure_coding::branches(&chunks).root())
}

// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)`
// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller.
async fn fetch_claim_queue(
sender: &mut impl overseer::CollationGenerationSenderTrait,
relay_parent: Hash,
) -> crate::error::Result<Option<BTreeMap<CoreIndex, VecDeque<ParaId>>>> {
if has_required_runtime(
sender,
relay_parent,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
)
.await
{
let res = request_claim_queue(relay_parent, sender).await.await??;
Ok(Some(res))
} else {
gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
Ok(None)
}
}

// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
// This function is supposed to be used in `handle_new_activations` hence the return type.
fn fetch_next_scheduled_on_core(
claim_queue: &BTreeMap<CoreIndex, VecDeque<ParaId>>,
core_idx: CoreIndex,
) -> Option<ScheduledCore> {
claim_queue
.get(&core_idx)?
.front()
.cloned()
.map(|para_id| ScheduledCore { para_id, collator: None })
}
15 changes: 9 additions & 6 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ use polkadot_node_subsystem::{
ActivatedLeaf,
};
use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_subsystem_util::{vstaging::ClaimQueueSnapshot, TimeoutExt};
use polkadot_primitives::{
async_backing::{BackingState, CandidatePendingAvailability},
AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData,
ScheduledCore, ValidationCode,
};
use rstest::rstest;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use std::pin::Pin;
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
};
use test_helpers::{
dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate,
};
Expand Down Expand Up @@ -617,7 +620,7 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
_hash,
RuntimeApiRequest::ClaimQueue(tx),
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
let res = BTreeMap::<CoreIndex, VecDeque<ParaId>>::new();
let res = ClaimQueueSnapshot::new();
tx.send(Ok(res)).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
Expand Down Expand Up @@ -780,7 +783,7 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
Expand Down Expand Up @@ -962,7 +965,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
Expand Down Expand Up @@ -1050,7 +1053,7 @@ mod helpers {
async_backing_params: AsyncBackingParams,
cores: Vec<CoreState>,
runtime_version: u32,
claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
claim_queue: ClaimQueueSnapshot,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/network/statement-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub enum Error {
#[error("Fetching validator groups failed {0:?}")]
FetchValidatorGroups(RuntimeApiError),

#[error("Fetching claim queue failed {0:?}")]
FetchClaimQueue(runtime::Error),

#[error("Attempted to share statement when not a validator or not assigned")]
InvalidShare,

Expand Down
131 changes: 95 additions & 36 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{request_min_backing_votes, ProspectiveParachainsMode},
vstaging::fetch_claim_queue,
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
Expand Down Expand Up @@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
struct PerRelayParentState {
local_validator: Option<LocalValidatorState>,
statement_store: StatementStore,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
seconding_limit: usize,
session: SessionIndex,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
}

impl PerRelayParentState {
Expand Down Expand Up @@ -563,11 +563,13 @@ pub(crate) async fn handle_active_leaves_update<Context>(
activated: &ActivatedLeaf,
leaf_mode: ProspectiveParachainsMode,
) -> JfyiErrorResult<()> {
let seconding_limit = match leaf_mode {
let max_candidate_depth = match leaf_mode {
ProspectiveParachainsMode::Disabled => return Ok(()),
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth + 1,
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth,
};

let seconding_limit = max_candidate_depth + 1;

state
.implicit_view
.activate_leaf(ctx.sender(), activated.hash)
Expand Down Expand Up @@ -693,15 +695,23 @@ pub(crate) async fn handle_active_leaves_update<Context>(
}
});

let groups_per_para = determine_groups_per_para(
ctx.sender(),
new_relay_parent,
availability_cores,
group_rotation_info,
max_candidate_depth,
)
.await;

state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
local_validator,
statement_store: StatementStore::new(&per_session.groups),
availability_cores,
group_rotation_info,
seconding_limit,
session: session_index,
groups_per_para,
},
);
}
Expand Down Expand Up @@ -2126,17 +2136,64 @@ async fn provide_candidate_to_grid<Context>(
}
}

fn group_for_para(
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
para_id: ParaId,
) -> Option<GroupIndex> {
// Note: this won't work well for on-demand parachains as it assumes that core assignments are
// fixed across blocks.
let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id));
// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
async fn determine_groups_per_para(
sender: &mut impl overseer::StatementDistributionSenderTrait,
relay_parent: Hash,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
max_candidate_depth: usize,
) -> HashMap<ParaId, Vec<GroupIndex>> {
let maybe_claim_queue = fetch_claim_queue(sender, relay_parent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, my thinking was to move all fetches here, including availability_cores for consistency. It's weird if we pass in one thing, but fetch the other our selves. I know this is used in one other place as well (wrongly), but given that we pass per value already, there is not much gain in avoiding the double fetch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalValidatorState is built outside this fn and requires availability_cores

.await
.unwrap_or_else(|err| {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?err,
"determine_groups_per_para: `claim_queue` API not available, falling back to iterating availability cores"
);
None
});

let n_cores = availability_cores.len();

// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next block if core has a candidate
// pending availability.
let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue {
claim_queue
.into_iter()
.filter_map(|(core_index, paras)| Some((*paras.front()?, core_index)))
.collect()
} else {
availability_cores
.into_iter()
.enumerate()
.filter_map(|(index, core)| match core {
CoreState::Scheduled(scheduled_core) =>
Some((scheduled_core.para_id, CoreIndex(index as u32))),
CoreState::Occupied(occupied_core) =>
if max_candidate_depth >= 1 {
occupied_core
.next_up_on_available
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
} else {
None
},
CoreState::Free => None,
})
.collect()
};

core_index
.map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()))
let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
for (para, core_index) in para_core_indices {
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index)
}

groups_per_para
}

#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
Expand Down Expand Up @@ -2192,18 +2249,23 @@ async fn fragment_tree_update_inner<Context>(
let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent);
if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
let group_index = group_for_para(
&prs.availability_cores,
&prs.group_rotation_info,
receipt.descriptor().para_id,
);

let per_session = state.per_session.get(&prs.session);
if let (Some(per_session), Some(group_index)) = (per_session, group_index) {
let group_index = confirmed.group_index();

// Sanity check if group_index is valid for this para at relay parent.
let Some(expected_groups) = prs.groups_per_para.get(&receipt.descriptor().para_id)
else {
continue
};
if !expected_groups.iter().any(|g| *g == group_index) {
continue
}

if let Some(per_session) = per_session {
send_backing_fresh_statements(
ctx,
candidate_hash,
group_index,
confirmed.group_index(),
&receipt.descriptor().relay_parent,
prs,
confirmed,
Expand Down Expand Up @@ -2311,13 +2373,12 @@ async fn handle_incoming_manifest_common<'a, Context>(
Some(x) => x,
};

let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para_id,
);
let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para_id) else {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
};

if expected_group != Some(manifest_summary.claimed_group_index) {
if !expected_groups.iter().any(|g| g == &manifest_summary.claimed_group_index) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
}
Expand Down Expand Up @@ -3037,13 +3098,11 @@ pub(crate) async fn handle_response<Context>(
relay_parent_state.session,
|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
|para, g_index| {
let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para,
);
let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para) else {
return false
};

Some(g_index) == expected_group
expected_groups.iter().any(|g| g == &g_index)
},
disabled_mask,
);
Expand Down
Loading
Loading