-
Notifications
You must be signed in to change notification settings - Fork 666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
statement-distribution: fix filtering of statements for elastic parachains #3879
Changes from 18 commits
e3b5192
01ceb3b
5ed7841
5f0872f
034b61a
9c91193
f87a624
70213c5
4c66001
21ae63d
e923dac
27ee145
be1a2c7
b9f1afc
17c4025
a185390
39f480d
d14384c
9de1a04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{ | |
backing_implicit_view::View as ImplicitView, | ||
reputation::ReputationAggregator, | ||
runtime::{request_min_backing_votes, ProspectiveParachainsMode}, | ||
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core, ClaimQueueSnapshot}, | ||
}; | ||
use polkadot_primitives::{ | ||
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex, | ||
|
@@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1); | |
struct PerRelayParentState { | ||
local_validator: Option<LocalValidatorState>, | ||
statement_store: StatementStore, | ||
availability_cores: Vec<CoreState>, | ||
group_rotation_info: GroupRotationInfo, | ||
seconding_limit: usize, | ||
session: SessionIndex, | ||
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>, | ||
} | ||
|
||
impl PerRelayParentState { | ||
|
@@ -563,11 +563,13 @@ pub(crate) async fn handle_active_leaves_update<Context>( | |
activated: &ActivatedLeaf, | ||
leaf_mode: ProspectiveParachainsMode, | ||
) -> JfyiErrorResult<()> { | ||
let seconding_limit = match leaf_mode { | ||
let max_candidate_depth = match leaf_mode { | ||
ProspectiveParachainsMode::Disabled => return Ok(()), | ||
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth + 1, | ||
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth, | ||
}; | ||
|
||
let seconding_limit = max_candidate_depth + 1; | ||
|
||
state | ||
.implicit_view | ||
.activate_leaf(ctx.sender(), activated.hash) | ||
|
@@ -693,15 +695,24 @@ pub(crate) async fn handle_active_leaves_update<Context>( | |
} | ||
}); | ||
|
||
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent) | ||
.await | ||
.map_err(JfyiError::FetchClaimQueue)?; | ||
|
||
let groups_per_para = determine_groups_per_para( | ||
availability_cores, | ||
group_rotation_info, | ||
maybe_claim_queue, | ||
max_candidate_depth, | ||
); | ||
state.per_relay_parent.insert( | ||
new_relay_parent, | ||
PerRelayParentState { | ||
local_validator, | ||
statement_store: StatementStore::new(&per_session.groups), | ||
availability_cores, | ||
group_rotation_info, | ||
seconding_limit, | ||
session: session_index, | ||
groups_per_para, | ||
}, | ||
); | ||
} | ||
|
@@ -2126,17 +2137,58 @@ async fn provide_candidate_to_grid<Context>( | |
} | ||
} | ||
|
||
fn group_for_para( | ||
availability_cores: &[CoreState], | ||
group_rotation_info: &GroupRotationInfo, | ||
para_id: ParaId, | ||
) -> Option<GroupIndex> { | ||
// Note: this won't work well for on-demand parachains as it assumes that core assignments are | ||
// fixed across blocks. | ||
let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id)); | ||
// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings. | ||
fn determine_groups_per_para( | ||
availability_cores: Vec<CoreState>, | ||
group_rotation_info: GroupRotationInfo, | ||
maybe_claim_queue: Option<ClaimQueueSnapshot>, | ||
sandreim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
max_candidate_depth: usize, | ||
) -> HashMap<ParaId, Vec<GroupIndex>> { | ||
let n_cores = availability_cores.len(); | ||
|
||
// Determine the core indices occupied by each para at the current relay parent. To support | ||
// on-demand parachains we also consider the core indices at next block if core has a candidate | ||
// pending availability. | ||
let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue { | ||
claim_queue | ||
.keys() | ||
.filter_map(|core_index| { | ||
let Some(scheduled_core) = fetch_next_scheduled_on_core(&claim_queue, *core_index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, first (not introduced here): I find the name of this function vastly misleading as it does not fetch anything. Second, why are we iterating only keys of the claim queue to fetch the actual element for each key, instead of iterating key/values to begin with? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to reuse the fn, but you are right, it is actually useless in this case. A wrapper for this BTreeMap with a method like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tdimitrov wdyt ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
else { | ||
return None | ||
}; | ||
|
||
core_index | ||
.map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len())) | ||
Some((scheduled_core.para_id, *core_index)) | ||
}) | ||
.collect() | ||
} else { | ||
availability_cores | ||
.into_iter() | ||
.enumerate() | ||
.filter_map(|(index, core)| match core { | ||
CoreState::Scheduled(scheduled_core) => | ||
Some((scheduled_core.para_id, CoreIndex(index as u32))), | ||
CoreState::Occupied(occupied_core) => | ||
if max_candidate_depth >= 1 { | ||
occupied_core | ||
.next_up_on_available | ||
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32))) | ||
} else { | ||
None | ||
}, | ||
CoreState::Free => None, | ||
}) | ||
.collect() | ||
}; | ||
|
||
let mut groups_per_para = HashMap::new(); | ||
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`. | ||
for (para, core_index) in para_core_indices { | ||
let group_index = group_rotation_info.group_for_core(core_index, n_cores); | ||
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index) | ||
} | ||
|
||
groups_per_para | ||
} | ||
|
||
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] | ||
|
@@ -2192,18 +2244,14 @@ async fn fragment_tree_update_inner<Context>( | |
let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash); | ||
let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent); | ||
if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) { | ||
let group_index = group_for_para( | ||
&prs.availability_cores, | ||
&prs.group_rotation_info, | ||
receipt.descriptor().para_id, | ||
); | ||
|
||
let per_session = state.per_session.get(&prs.session); | ||
if let (Some(per_session), Some(group_index)) = (per_session, group_index) { | ||
// TODO(maybe for sanity): perform an extra check on the candidate backing group | ||
alexggh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// index all allowed | ||
if let Some(per_session) = per_session { | ||
send_backing_fresh_statements( | ||
ctx, | ||
candidate_hash, | ||
group_index, | ||
confirmed.group_index(), | ||
&receipt.descriptor().relay_parent, | ||
prs, | ||
confirmed, | ||
|
@@ -2311,13 +2359,14 @@ async fn handle_incoming_manifest_common<'a, Context>( | |
Some(x) => x, | ||
}; | ||
|
||
let expected_group = group_for_para( | ||
&relay_parent_state.availability_cores, | ||
&relay_parent_state.group_rotation_info, | ||
para_id, | ||
); | ||
let expected_groups = relay_parent_state.groups_per_para.get(¶_id); | ||
|
||
if expected_group != Some(manifest_summary.claimed_group_index) { | ||
if expected_groups.is_none() || | ||
!expected_groups | ||
.expect("checked is_some(); qed") | ||
sandreim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.iter() | ||
.any(|g| g == &manifest_summary.claimed_group_index) | ||
{ | ||
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await; | ||
return None | ||
} | ||
|
@@ -3037,13 +3086,11 @@ pub(crate) async fn handle_response<Context>( | |
relay_parent_state.session, | ||
|v| per_session.session_info.validators.get(v).map(|x| x.clone()), | ||
|para, g_index| { | ||
let expected_group = group_for_para( | ||
&relay_parent_state.availability_cores, | ||
&relay_parent_state.group_rotation_info, | ||
para, | ||
); | ||
let Some(expected_groups) = relay_parent_state.groups_per_para.get(¶) else { | ||
return false | ||
}; | ||
|
||
Some(g_index) == expected_group | ||
expected_groups.iter().any(|g| g == &g_index) | ||
}, | ||
disabled_mask, | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we pass per value (can't re-use the result), might make sense to move that call into
determine_groups_per_para
.The result of request_availability_cores is used in
find_active_validator_state
, but wrongly. It usespara_id
a function we should remove, as it makes no sense to either return theParaId
of the occupying para or the scheduled one.*)Regardless, I would move the fetching code into the function that need the data.
handle_active_leaves_update
is already pages of code.*) Note: This is obviously unrelated, so can be a separate PR, but it should be fixed for Coretime asap. @tdimitrov
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#3948