Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastic scaling: add e2e test #3929

Merged
merged 27 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e3b5192
refactor: move claim queue fetch utils to utils crate
sandreim Mar 28, 2024
01ceb3b
cache all groups per para and remove para:group 1:1 assumption
sandreim Mar 28, 2024
5ed7841
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Mar 28, 2024
5f0872f
typo
sandreim Mar 28, 2024
034b61a
fix test build
sandreim Mar 28, 2024
9c91193
testing some local test run weirdness
sandreim Mar 28, 2024
f87a624
fix doctest
sandreim Mar 28, 2024
70213c5
remove panic
sandreim Mar 28, 2024
4c66001
add new cluster test
sandreim Mar 29, 2024
21ae63d
add grid test
sandreim Mar 29, 2024
e923dac
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Mar 29, 2024
27ee145
fmt
sandreim Mar 29, 2024
be1a2c7
explicit max_candidate_depth
sandreim Apr 1, 2024
b9f1afc
fix test comment
sandreim Apr 1, 2024
6340e32
Add elastic scaling zombienet test
sandreim Apr 1, 2024
8495e6f
js script
sandreim Apr 1, 2024
04dc5fd
move mvp test
sandreim Apr 1, 2024
bd431cb
fix cmd
sandreim Apr 1, 2024
1713be1
fix path
sandreim Apr 1, 2024
14fcb4b
rename mvp test
sandreim Apr 2, 2024
07b58a3
proper js args
sandreim Apr 2, 2024
2b2d2ab
non elast scaling para assert
sandreim Apr 2, 2024
332fda9
some tolerance
sandreim Apr 2, 2024
86c22a4
properly call assign-core.js
sandreim Apr 2, 2024
a32c2c7
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Apr 3, 2024
7bde9d8
minor script change
sandreim Apr 3, 2024
a453123
Update polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-…
pepoviola Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions .gitlab/pipeline/zombienet/polkadot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,21 @@ zombienet-polkadot-functional-0011-async-backing-6-seconds-rate:
--local-dir="${LOCAL_DIR}/functional"
--test="0011-async-backing-6-seconds-rate.zndsl"

zombienet-polkadot-functional-0012-elastic-scaling-mvp:
zombienet-polkadot-elastic-scaling-0001-basic-3cores-6s-blocks:
extends:
- .zombienet-polkadot-common
script:
- /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
--local-dir="${LOCAL_DIR}/functional"
--test="0012-elastic-scaling-mvp.zndsl"
--local-dir="${LOCAL_DIR}/elastic_scaling"
--test="0001-basic-3cores-6s-blocks.zndsl"

zombienet-polkadot-elastic-scaling-0002-elastic-scaling-mvp:
sandreim marked this conversation as resolved.
Show resolved Hide resolved
extends:
- .zombienet-polkadot-common
script:
- /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
--local-dir="${LOCAL_DIR}/elastic_scaling"
--test="0002-elastic-scaling-mvp.zndsl"

zombienet-polkadot-smoke-0001-parachains-smoke-test:
extends:
Expand Down
2 changes: 2 additions & 0 deletions polkadot/node/collation-generation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
#[error(transparent)]
UtilRuntime(#[from] polkadot_node_subsystem_util::runtime::Error),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error("Parachain backing state not available in runtime.")]
MissingParaBackingState,
Expand Down
54 changes: 10 additions & 44 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,23 @@ use polkadot_node_primitives::{
SubmitCollationParams,
};
use polkadot_node_subsystem::{
messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest},
messages::{CollationGenerationMessage, CollatorProtocolMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_para_backing_state, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
request_async_backing_params, request_availability_cores, request_para_backing_state,
request_persisted_validation_data, request_validation_code, request_validation_code_hash,
request_validators,
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core},
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScheduledCore, ValidationCodeHash,
PersistedValidationData, ValidationCodeHash,
};
use sp_core::crypto::Pair;
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
use std::sync::Arc;

mod error;

Expand Down Expand Up @@ -228,7 +226,9 @@ async fn handle_new_activations<Context>(
let availability_cores = availability_cores??;
let async_backing_params = async_backing_params?.ok();
let n_validators = validators??.len();
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent)
.await
.map_err(crate::error::Error::UtilRuntime)?;

// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();
Expand Down Expand Up @@ -655,37 +655,3 @@ fn erasure_root(
let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
Ok(polkadot_erasure_coding::branches(&chunks).root())
}

// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)`
// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller.
async fn fetch_claim_queue(
sender: &mut impl overseer::CollationGenerationSenderTrait,
relay_parent: Hash,
) -> crate::error::Result<Option<BTreeMap<CoreIndex, VecDeque<ParaId>>>> {
if has_required_runtime(
sender,
relay_parent,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
)
.await
{
let res = request_claim_queue(relay_parent, sender).await.await??;
Ok(Some(res))
} else {
gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
Ok(None)
}
}

// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
// This function is supposed to be used in `handle_new_activations` hence the return type.
fn fetch_next_scheduled_on_core(
claim_queue: &BTreeMap<CoreIndex, VecDeque<ParaId>>,
core_idx: CoreIndex,
) -> Option<ScheduledCore> {
claim_queue
.get(&core_idx)?
.front()
.cloned()
.map(|para_id| ScheduledCore { para_id, collator: None })
}
15 changes: 9 additions & 6 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ use polkadot_node_subsystem::{
ActivatedLeaf,
};
use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_subsystem_util::{vstaging::ClaimQueueSnapshot, TimeoutExt};
use polkadot_primitives::{
async_backing::{BackingState, CandidatePendingAvailability},
AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData,
ScheduledCore, ValidationCode,
};
use rstest::rstest;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use std::pin::Pin;
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
};
use test_helpers::{
dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate,
};
Expand Down Expand Up @@ -617,7 +620,7 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
_hash,
RuntimeApiRequest::ClaimQueue(tx),
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
let res = BTreeMap::<CoreIndex, VecDeque<ParaId>>::new();
let res = ClaimQueueSnapshot::new();
tx.send(Ok(res)).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
Expand Down Expand Up @@ -780,7 +783,7 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
Expand Down Expand Up @@ -962,7 +965,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
Expand Down Expand Up @@ -1050,7 +1053,7 @@ mod helpers {
async_backing_params: AsyncBackingParams,
cores: Vec<CoreState>,
runtime_version: u32,
claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
claim_queue: ClaimQueueSnapshot,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/network/statement-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub enum Error {
#[error("Fetching validator groups failed {0:?}")]
FetchValidatorGroups(RuntimeApiError),

#[error("Fetching claim queue failed {0:?}")]
FetchClaimQueue(runtime::Error),

#[error("Attempted to share statement when not a validator or not assigned")]
InvalidShare,

Expand Down
113 changes: 79 additions & 34 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{request_min_backing_votes, ProspectiveParachainsMode},
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core, ClaimQueueSnapshot},
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
Expand Down Expand Up @@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
struct PerRelayParentState {
local_validator: Option<LocalValidatorState>,
statement_store: StatementStore,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
seconding_limit: usize,
session: SessionIndex,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
}

impl PerRelayParentState {
Expand Down Expand Up @@ -563,11 +563,13 @@ pub(crate) async fn handle_active_leaves_update<Context>(
activated: &ActivatedLeaf,
leaf_mode: ProspectiveParachainsMode,
) -> JfyiErrorResult<()> {
let seconding_limit = match leaf_mode {
let max_candidate_depth = match leaf_mode {
ProspectiveParachainsMode::Disabled => return Ok(()),
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth + 1,
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth,
};

let seconding_limit = max_candidate_depth + 1;

state
.implicit_view
.activate_leaf(ctx.sender(), activated.hash)
Expand Down Expand Up @@ -693,15 +695,24 @@ pub(crate) async fn handle_active_leaves_update<Context>(
}
});

let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent)
.await
.map_err(JfyiError::FetchClaimQueue)?;

let groups_per_para = determine_groups_per_para(
&availability_cores,
&group_rotation_info,
&maybe_claim_queue,
max_candidate_depth,
);
state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
local_validator,
statement_store: StatementStore::new(&per_session.groups),
availability_cores,
group_rotation_info,
seconding_limit,
session: session_index,
groups_per_para,
},
);
}
Expand Down Expand Up @@ -2126,17 +2137,54 @@ async fn provide_candidate_to_grid<Context>(
}
}

fn group_for_para(
// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
fn determine_groups_per_para(
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
para_id: ParaId,
) -> Option<GroupIndex> {
// Note: this won't work well for on-demand parachains as it assumes that core assignments are
// fixed across blocks.
let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id));
maybe_claim_queue: &Option<ClaimQueueSnapshot>,
max_candidate_depth: usize,
) -> HashMap<ParaId, Vec<GroupIndex>> {
// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next block if core has a candidate
// pending availability.
let para_core_indices = availability_cores.iter().enumerate().filter_map(|(index, core)| {
match core {
CoreState::Scheduled(scheduled_core) =>
Some((scheduled_core.para_id, CoreIndex(index as u32))),
CoreState::Occupied(occupied_core) => {
if max_candidate_depth >= 1 {
// Use claim queue if available, or fallback to `next_up_on_available`
let maybe_scheduled_core = match maybe_claim_queue {
Some(claim_queue) => {
// What's up next on this core ?
fetch_next_scheduled_on_core(claim_queue, CoreIndex(index as u32))
},
None => {
// Runtime doesn't support claim queue runtime api. Fallback to
// `next_up_on_available`
occupied_core.next_up_on_available.clone()
},
};

maybe_scheduled_core
.filter(|scheduled_core| scheduled_core.para_id == occupied_core.para_id())
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
} else {
None
}
},
CoreState::Free => None,
}
});

core_index
.map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()))
let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
for (para, core_index) in para_core_indices {
let group_index = group_rotation_info.group_for_core(core_index, availability_cores.len());
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index)
}

groups_per_para
}

#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
Expand Down Expand Up @@ -2192,18 +2240,14 @@ async fn fragment_tree_update_inner<Context>(
let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent);
if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
let group_index = group_for_para(
&prs.availability_cores,
&prs.group_rotation_info,
receipt.descriptor().para_id,
);

let per_session = state.per_session.get(&prs.session);
if let (Some(per_session), Some(group_index)) = (per_session, group_index) {
// TODO(maybe for sanity): perform an extra check on the candidate backing group
// index all allowed
if let Some(per_session) = per_session {
send_backing_fresh_statements(
ctx,
candidate_hash,
group_index,
confirmed.group_index(),
&receipt.descriptor().relay_parent,
prs,
confirmed,
Expand Down Expand Up @@ -2311,13 +2355,14 @@ async fn handle_incoming_manifest_common<'a, Context>(
Some(x) => x,
};

let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para_id,
);
let expected_groups = relay_parent_state.groups_per_para.get(&para_id);

if expected_group != Some(manifest_summary.claimed_group_index) {
if expected_groups.is_none() ||
!expected_groups
.expect("checked is_some(); qed")
.iter()
.any(|g| g == &manifest_summary.claimed_group_index)
{
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
}
Expand Down Expand Up @@ -3037,13 +3082,13 @@ pub(crate) async fn handle_response<Context>(
relay_parent_state.session,
|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
|para, g_index| {
let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para,
);
let expected_groups = relay_parent_state.groups_per_para.get(&para);

Some(g_index) == expected_group
expected_groups.is_some() &&
expected_groups
.expect("checked is_some(); qed")
.iter()
.any(|g| g == &g_index)
},
disabled_mask,
);
Expand Down
Loading
Loading