diff --git a/Cargo.lock b/Cargo.lock index 34e30d79e98b..f26252520ff1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14312,6 +14312,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", + "polkadot-parachain-primitives", "polkadot-primitives", "polkadot-primitives-test-helpers", "polkadot-statement-table", @@ -14532,6 +14533,7 @@ dependencies = [ "slotmap", "sp-core 28.0.0", "sp-maybe-compressed-blob 11.0.0", + "strum 0.26.2", "tempfile", "test-parachain-adder", "test-parachain-halt", @@ -14784,6 +14786,7 @@ dependencies = [ "sp-blockchain", "sp-consensus-babe", "sp-runtime 31.0.1", + "strum 0.26.2", "substrate-prometheus-endpoint", "thiserror", ] diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 835098293050..0cb977c58021 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -38,7 +38,7 @@ use polkadot_node_subsystem::{ ApprovalVotingMessage, AssignmentCheckError, AssignmentCheckResult, AvailabilityRecoveryMessage, BlockDescription, CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage, CheckedIndirectAssignment, CheckedIndirectSignedApprovalVote, - DisputeCoordinatorMessage, HighestApprovedAncestorBlock, RuntimeApiMessage, + DisputeCoordinatorMessage, HighestApprovedAncestorBlock, PvfExecKind, RuntimeApiMessage, RuntimeApiRequest, }, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult, @@ -53,8 +53,8 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::{ ApprovalVoteMultipleCandidates, ApprovalVotingParams, BlockNumber, CandidateHash, - CandidateIndex, CandidateReceipt, CoreIndex, ExecutorParams, GroupIndex, Hash, PvfExecKind, - SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, + CandidateIndex, CandidateReceipt, CoreIndex, ExecutorParams, GroupIndex, Hash, SessionIndex, + SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, }; use sc_keystore::LocalKeystore; use sp_application_crypto::Pair; diff --git a/polkadot/node/core/backing/Cargo.toml b/polkadot/node/core/backing/Cargo.toml index 1b52afc309bc..bd56a3ad693b 100644 --- a/polkadot/node/core/backing/Cargo.toml +++ b/polkadot/node/core/backing/Cargo.toml @@ -14,6 +14,7 @@ futures = { workspace = true } sp-keystore = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } polkadot-node-primitives = { workspace = true, default-features = true } +polkadot-parachain-primitives = { workspace = true, default-features = true } polkadot-node-subsystem = { workspace = true, default-features = true } polkadot-node-subsystem-util = { workspace = true, default-features = true } polkadot-erasure-coding = { workspace = true, default-features = true } diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index f276321c87ed..4463fb34b510 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -89,8 +89,9 @@ use polkadot_node_subsystem::{ AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest, CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage, HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest, - ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, RuntimeApiMessage, - RuntimeApiRequest, StatementDistributionMessage, StoreAvailableDataError, + ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind, + RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage, + StoreAvailableDataError, }, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; @@ -105,12 +106,13 @@ use polkadot_node_subsystem_util::{ }, Validator, }; +use polkadot_parachain_primitives::primitives::IsSystem; use polkadot_primitives::{ node_features::FeatureIndex, BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures, PersistedValidationData, - PvfExecKind, SessionIndex, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, - ValidatorSignature, ValidityAttestation, + SessionIndex, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature, + ValidityAttestation, }; use polkadot_statement_table::{ generic::AttestedCandidate as TableAttestedCandidate, @@ -625,6 +627,7 @@ async fn request_candidate_validation( executor_params: ExecutorParams, ) -> Result { let (tx, rx) = oneshot::channel(); + let is_system = candidate_receipt.descriptor.para_id.is_system(); sender .send_message(CandidateValidationMessage::ValidateFromExhaustive { @@ -633,7 +636,11 @@ async fn request_candidate_validation( candidate_receipt, pov, executor_params, - exec_kind: PvfExecKind::Backing, + exec_kind: if is_system { + PvfExecKind::BackingSystemParas + } else { + PvfExecKind::Backing + }, response_sender: tx, }) .await; diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs index 10eb45b82d12..d9c1fc9499e5 100644 --- a/polkadot/node/core/backing/src/tests/mod.rs +++ b/polkadot/node/core/backing/src/tests/mod.rs @@ -30,7 +30,7 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_primitives::{ node_features, CandidateDescriptor, GroupRotationInfo, HeadData, PersistedValidationData, - PvfExecKind, ScheduledCore, SessionIndex, LEGACY_MIN_BACKING_VOTES, + ScheduledCore, SessionIndex, LEGACY_MIN_BACKING_VOTES, }; use polkadot_primitives_test_helpers::{ dummy_candidate_receipt_bad_sig, dummy_collator, dummy_collator_signature, @@ -434,7 +434,7 @@ async fn assert_validate_from_exhaustive( ) if validation_data == *assert_pvd && validation_code == *assert_validation_code && *pov == *assert_pov && &candidate_receipt.descriptor == assert_candidate.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate_receipt.commitments_hash == assert_candidate.commitments.hash() => { response_sender.send(Ok(ValidationResult::Valid( @@ -651,7 +651,7 @@ fn backing_works(#[case] elastic_scaling_mvp: bool) { ) if validation_data == pvd_ab && validation_code == validation_code_ab && *pov == pov_ab && &candidate_receipt.descriptor == candidate_a.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate_receipt.commitments_hash == candidate_a_commitments_hash => { response_sender.send(Ok( @@ -1287,7 +1287,7 @@ fn backing_works_while_validation_ongoing() { ) if validation_data == pvd_abc && validation_code == validation_code_abc && *pov == pov_abc && &candidate_receipt.descriptor == candidate_a.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate_a_commitments_hash == candidate_receipt.commitments_hash => { // we never validate the candidate. our local node @@ -1454,7 +1454,7 @@ fn backing_misbehavior_works() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_a && &candidate_receipt.descriptor == candidate_a.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate_a_commitments_hash == candidate_receipt.commitments_hash => { response_sender.send(Ok( @@ -1621,7 +1621,7 @@ fn backing_dont_second_invalid() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_block_a && &candidate_receipt.descriptor == candidate_a.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate_a.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); @@ -1661,7 +1661,7 @@ fn backing_dont_second_invalid() { ) if validation_data == pvd_b && validation_code == validation_code_b && *pov == pov_block_b && &candidate_receipt.descriptor == candidate_b.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate_b.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Ok( @@ -1788,7 +1788,7 @@ fn backing_second_after_first_fails_works() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_a && &candidate_receipt.descriptor == candidate.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); @@ -1932,7 +1932,7 @@ fn backing_works_after_failed_validation() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_a && &candidate_receipt.descriptor == candidate.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Err(ValidationFailed("Internal test error".into()))).unwrap(); @@ -2211,7 +2211,7 @@ fn retry_works() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_a && &candidate_receipt.descriptor == candidate.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate.commitments.hash() == candidate_receipt.commitments_hash ); virtual_overseer @@ -2753,7 +2753,7 @@ fn validator_ignores_statements_from_disabled_validators() { ) if validation_data == pvd && validation_code == expected_validation_code && *pov == expected_pov && &candidate_receipt.descriptor == candidate.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate_commitments_hash == candidate_receipt.commitments_hash => { response_sender.send(Ok( diff --git a/polkadot/node/core/backing/src/tests/prospective_parachains.rs b/polkadot/node/core/backing/src/tests/prospective_parachains.rs index 15bc0b4a1139..57b2fabd43b0 100644 --- a/polkadot/node/core/backing/src/tests/prospective_parachains.rs +++ b/polkadot/node/core/backing/src/tests/prospective_parachains.rs @@ -276,7 +276,7 @@ async fn assert_validate_seconded_candidate( &validation_code == assert_validation_code && &*pov == assert_pov && &candidate_receipt.descriptor == candidate.descriptor() && - exec_kind == PvfExecKind::Backing && + exec_kind == PvfExecKind::BackingSystemParas && candidate.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Ok(ValidationResult::Valid( diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 50505d733916..e875be9b5df9 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -31,8 +31,8 @@ use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult}; use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{ - CandidateValidationMessage, PreCheckOutcome, RuntimeApiMessage, RuntimeApiRequest, - ValidationFailed, + CandidateValidationMessage, PreCheckOutcome, PvfExecKind, RuntimeApiMessage, + RuntimeApiRequest, ValidationFailed, }, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender, @@ -46,8 +46,9 @@ use polkadot_primitives::{ DEFAULT_LENIENT_PREPARATION_TIMEOUT, DEFAULT_PRECHECK_PREPARATION_TIMEOUT, }, AuthorityDiscoveryId, CandidateCommitments, CandidateDescriptor, CandidateEvent, - CandidateReceipt, ExecutorParams, Hash, PersistedValidationData, PvfExecKind, PvfPrepKind, - SessionIndex, ValidationCode, ValidationCodeHash, ValidatorId, + CandidateReceipt, ExecutorParams, Hash, PersistedValidationData, + PvfExecKind as RuntimePvfExecKind, PvfPrepKind, SessionIndex, ValidationCode, + ValidationCodeHash, ValidatorId, }; use sp_application_crypto::{AppCrypto, ByteArray}; use sp_keystore::KeystorePtr; @@ -667,9 +668,9 @@ async fn validate_candidate_exhaustive( let result = match exec_kind { // Retry is disabled to reduce the chance of nondeterministic blocks getting backed and // honest backers getting slashed. - PvfExecKind::Backing => { + PvfExecKind::Backing | PvfExecKind::BackingSystemParas => { let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); - let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind); + let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind.into()); let pvf = PvfPrepData::from_code( validation_code.0, executor_params, @@ -683,20 +684,22 @@ async fn validate_candidate_exhaustive( exec_timeout, persisted_validation_data.clone(), pov, - polkadot_node_core_pvf::Priority::Normal, + exec_kind.into(), + exec_kind, ) .await }, - PvfExecKind::Approval => + PvfExecKind::Approval | PvfExecKind::Dispute => validation_backend .validate_candidate_with_retry( validation_code.0, - pvf_exec_timeout(&executor_params, exec_kind), + pvf_exec_timeout(&executor_params, exec_kind.into()), persisted_validation_data.clone(), pov, executor_params, PVF_APPROVAL_EXECUTION_RETRY_DELAY, - polkadot_node_core_pvf::Priority::Critical, + exec_kind.into(), + exec_kind, ) .await, }; @@ -784,6 +787,8 @@ trait ValidationBackend { pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, + // The kind for the execution job. + exec_kind: PvfExecKind, ) -> Result; /// Tries executing a PVF. Will retry once if an error is encountered that may have @@ -804,6 +809,8 @@ trait ValidationBackend { retry_delay: Duration, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, + // The kind for the execution job. + exec_kind: PvfExecKind, ) -> Result { let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. @@ -825,6 +832,7 @@ trait ValidationBackend { pvd.clone(), pov.clone(), prepare_priority, + exec_kind, ) .await; if validation_result.is_ok() { @@ -905,6 +913,7 @@ trait ValidationBackend { pvd.clone(), pov.clone(), prepare_priority, + exec_kind, ) .await; } @@ -929,9 +938,13 @@ impl ValidationBackend for ValidationHost { pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, + // The kind for the execution job. + exec_kind: PvfExecKind, ) -> Result { let (tx, rx) = oneshot::channel(); - if let Err(err) = self.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, tx).await + if let Err(err) = self + .execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, exec_kind, tx) + .await { return Err(InternalValidationError::HostCommunication(format!( "cannot send pvf to the validation host, it might have shut down: {:?}", @@ -1023,12 +1036,12 @@ fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepKind) -> Dura /// This should be much longer than the backing execution timeout to ensure that in the /// absence of extremely large disparities between hardware, blocks that pass backing are /// considered executable by approval checkers or dispute participants. -fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: PvfExecKind) -> Duration { +fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: RuntimePvfExecKind) -> Duration { if let Some(timeout) = executor_params.pvf_exec_timeout(kind) { return timeout } match kind { - PvfExecKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT, - PvfExecKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT, + RuntimePvfExecKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT, + RuntimePvfExecKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT, } } diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index 46de55e4836e..2f7baf4abb61 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; +use crate::PvfExecKind; use assert_matches::assert_matches; use futures::executor; use polkadot_node_core_pvf::PrepareError; @@ -441,6 +442,7 @@ impl ValidationBackend for MockValidateCandidateBackend { _pvd: Arc, _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, + _exec_kind: PvfExecKind, ) -> Result { // This is expected to panic if called more times than expected, indicating an error in the // test. @@ -1023,6 +1025,7 @@ impl ValidationBackend for MockPreCheckBackend { _pvd: Arc, _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, + _exec_kind: PvfExecKind, ) -> Result { unreachable!() } @@ -1177,6 +1180,7 @@ impl ValidationBackend for MockHeadsUp { _pvd: Arc, _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, + _exec_kind: PvfExecKind, ) -> Result { unreachable!() } diff --git a/polkadot/node/core/dispute-coordinator/src/participation/mod.rs b/polkadot/node/core/dispute-coordinator/src/participation/mod.rs index b58ce570f8ff..2220f65e20a7 100644 --- a/polkadot/node/core/dispute-coordinator/src/participation/mod.rs +++ b/polkadot/node/core/dispute-coordinator/src/participation/mod.rs @@ -27,13 +27,11 @@ use futures_timer::Delay; use polkadot_node_primitives::ValidationResult; use polkadot_node_subsystem::{ - messages::{AvailabilityRecoveryMessage, CandidateValidationMessage}, + messages::{AvailabilityRecoveryMessage, CandidateValidationMessage, PvfExecKind}, overseer, ActiveLeavesUpdate, RecoveryError, }; use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; -use polkadot_primitives::{ - BlockNumber, CandidateHash, CandidateReceipt, Hash, PvfExecKind, SessionIndex, -}; +use polkadot_primitives::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; use crate::LOG_TARGET; @@ -387,7 +385,7 @@ async fn participate( candidate_receipt: req.candidate_receipt().clone(), pov: available_data.pov, executor_params: req.executor_params(), - exec_kind: PvfExecKind::Approval, + exec_kind: PvfExecKind::Dispute, response_sender: validation_tx, }) .await; diff --git a/polkadot/node/core/dispute-coordinator/src/participation/tests.rs b/polkadot/node/core/dispute-coordinator/src/participation/tests.rs index a80553828ac6..a6ab6f16df05 100644 --- a/polkadot/node/core/dispute-coordinator/src/participation/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/participation/tests.rs @@ -26,7 +26,7 @@ use codec::Encode; use polkadot_node_primitives::{AvailableData, BlockData, InvalidCandidate, PoV}; use polkadot_node_subsystem::{ messages::{ - AllMessages, ChainApiMessage, DisputeCoordinatorMessage, RuntimeApiMessage, + AllMessages, ChainApiMessage, DisputeCoordinatorMessage, PvfExecKind, RuntimeApiMessage, RuntimeApiRequest, }, ActiveLeavesUpdate, SpawnGlue, @@ -116,7 +116,7 @@ pub async fn participation_full_happy_path( ctx_handle.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive { candidate_receipt, exec_kind, response_sender, .. } - ) if exec_kind == PvfExecKind::Approval => { + ) if exec_kind == PvfExecKind::Dispute => { if expected_commitments_hash != candidate_receipt.commitments_hash { response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap(); } else { @@ -450,7 +450,7 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { ctx_handle.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive { exec_kind, response_sender, .. } - ) if exec_kind == PvfExecKind::Approval => { + ) if exec_kind == PvfExecKind::Dispute => { response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))).unwrap(); }, "overseer did not receive candidate validation message", @@ -487,7 +487,7 @@ fn cast_invalid_vote_if_commitments_dont_match() { ctx_handle.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive { exec_kind, response_sender, .. } - ) if exec_kind == PvfExecKind::Approval => { + ) if exec_kind == PvfExecKind::Dispute => { response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap(); }, "overseer did not receive candidate validation message", @@ -524,7 +524,7 @@ fn cast_valid_vote_if_validation_passes() { ctx_handle.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive { exec_kind, response_sender, .. } - ) if exec_kind == PvfExecKind::Approval => { + ) if exec_kind == PvfExecKind::Dispute => { response_sender.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap(); }, "overseer did not receive candidate validation message", diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index d603af04bf06..13fcdc69a99a 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -24,6 +24,7 @@ slotmap = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { features = ["fs", "process"], workspace = true, default-features = true } +strum = { features = ["derive"], workspace = true, default-features = true } codec = { features = [ "derive", diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index 11031bf1074a..096cec3501b8 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -35,15 +35,17 @@ use polkadot_node_core_pvf_common::{ SecurityStatus, }; use polkadot_node_primitives::PoV; +use polkadot_node_subsystem::messages::PvfExecKind; use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, PersistedValidationData}; use slotmap::HopSlotMap; use std::{ - collections::VecDeque, + collections::{HashMap, VecDeque}, fmt, path::PathBuf, sync::Arc, time::{Duration, Instant}, }; +use strum::IntoEnumIterator; /// The amount of time a job for which the queue does not have a compatible worker may wait in the /// queue. After that time passes, the queue will kill the first worker which becomes idle to @@ -74,6 +76,7 @@ pub struct PendingExecutionRequest { pub pov: Arc, pub executor_params: ExecutorParams, pub result_tx: ResultSender, + pub exec_kind: PvfExecKind, } struct ExecuteJob { @@ -166,7 +169,7 @@ struct Queue { security_status: SecurityStatus, /// The queue of jobs that are waiting for a worker to pick up. - queue: VecDeque, + unscheduled: Unscheduled, workers: Workers, mux: Mux, } @@ -192,7 +195,7 @@ impl Queue { security_status, to_queue_rx, from_queue_tx, - queue: VecDeque::new(), + unscheduled: Unscheduled::new(), mux: Mux::new(), workers: Workers { running: HopSlotMap::with_capacity_and_key(10), @@ -226,9 +229,13 @@ impl Queue { /// If all the workers are busy or the queue is empty, it does nothing. /// Should be called every time a new job arrives to the queue or a job finishes. fn try_assign_next_job(&mut self, finished_worker: Option) { - // New jobs are always pushed to the tail of the queue; the one at its head is always - // the eldest one. - let eldest = if let Some(eldest) = self.queue.get(0) { eldest } else { return }; + // We always work at the same priority level + let priority = self.unscheduled.select_next_priority(); + let Some(queue) = self.unscheduled.get_mut(priority) else { return }; + + // New jobs are always pushed to the tail of the queue based on their priority; + // the one at its head of each queue is always the eldest one. + let eldest = if let Some(eldest) = queue.get(0) { eldest } else { return }; // By default, we're going to execute the eldest job on any worker slot available, even if // we have to kill and re-spawn a worker @@ -240,7 +247,7 @@ impl Queue { if eldest.waiting_since.elapsed() < MAX_KEEP_WAITING { if let Some(finished_worker) = finished_worker { if let Some(worker_data) = self.workers.running.get(finished_worker) { - for (i, job) in self.queue.iter().enumerate() { + for (i, job) in queue.iter().enumerate() { if worker_data.executor_params_hash == job.executor_params.hash() { (worker, job_index) = (Some(finished_worker), i); break @@ -252,7 +259,7 @@ impl Queue { if worker.is_none() { // Try to obtain a worker for the job - worker = self.workers.find_available(self.queue[job_index].executor_params.hash()); + worker = self.workers.find_available(queue[job_index].executor_params.hash()); } if worker.is_none() { @@ -270,13 +277,15 @@ impl Queue { return } - let job = self.queue.remove(job_index).expect("Job is just checked to be in queue; qed"); + let job = queue.remove(job_index).expect("Job is just checked to be in queue; qed"); if let Some(worker) = worker { assign(self, worker, job); } else { spawn_extra_worker(self, job); } + self.metrics.on_execute_kind(priority); + self.unscheduled.mark_scheduled(priority); } } @@ -297,7 +306,7 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue; - let PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } = + let PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx, exec_kind } = pending_execution_request; gum::debug!( target: LOG_TARGET, @@ -315,7 +324,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { result_tx, waiting_since: Instant::now(), }; - queue.queue.push_back(job); + queue.unscheduled.add(job, exec_kind); queue.try_assign_next_job(None); } @@ -638,3 +647,297 @@ pub fn start( .run(); (to_queue_tx, from_queue_rx, run) } + +struct Unscheduled { + unscheduled: HashMap>, + counter: HashMap, +} + +impl Unscheduled { + /// We keep track of every scheduled job in the `counter`, but reset it if the total number of + /// counted jobs reaches the threshold. This number is set as the maximum amount of jobs per + /// relay chain block possible with 4 CPU cores and 2 seconds of execution time. Under normal + /// conditions, the maximum expected queue size is at least vrf_module_samples(6) + 1 for + /// backing a parachain candidate. A buffer is added to cover situations where more work + /// arrives in the queue. + const SCHEDULING_WINDOW_SIZE: usize = 12; + + /// A threshold in percentages indicates how much time a current priority can "steal" from lower + /// priorities. Given the `SCHEDULING_WINDOW_SIZE` is 12 and all job priorities are present: + /// - Disputes consume 70% or 8 jobs in a row. + /// - The remaining 30% of original 100% is allocated for approval and all backing jobs. + /// - 80% or 3 jobs of the remaining goes to approvals. + /// - The remaining 6% of original 100% is allocated for all backing jobs. + /// - 100% or 1 job of the remaining goes to backing system parachains. + /// - Nothing is left for backing. + /// - The counter is restarted and the distribution starts from the beginning. + /// + /// This system might seem complex, but we operate with the remaining percentages because: + /// - Not all job types are present in each block. If we used parts of the original 100%, + /// approvals could not exceed 24%, even if there are no disputes. + /// - We cannot fully prioritize backing system parachains over backing other parachains based + /// on the distribution of the original 100%. + const PRIORITY_ALLOCATION_THRESHOLDS: &'static [(PvfExecKind, usize)] = &[ + (PvfExecKind::Dispute, 70), + (PvfExecKind::Approval, 80), + (PvfExecKind::BackingSystemParas, 100), + (PvfExecKind::Backing, 100), + ]; + + fn new() -> Self { + Self { + unscheduled: PvfExecKind::iter().map(|priority| (priority, VecDeque::new())).collect(), + counter: PvfExecKind::iter().map(|priority| (priority, 0)).collect(), + } + } + + fn select_next_priority(&self) -> PvfExecKind { + gum::debug!( + target: LOG_TARGET, + unscheduled = ?self.unscheduled.iter().map(|(p, q)| (*p, q.len())).collect::>(), + counter = ?self.counter, + "Selecting next execution priority...", + ); + + let priority = PvfExecKind::iter() + .find(|priority| self.has_pending(priority) && !self.has_reached_threshold(priority)) + .unwrap_or_else(|| { + PvfExecKind::iter() + .find(|priority| self.has_pending(priority)) + .unwrap_or(PvfExecKind::Backing) + }); + + gum::debug!( + target: LOG_TARGET, + ?priority, + "Selected next execution priority", + ); + + priority + } + + fn get_mut(&mut self, priority: PvfExecKind) -> Option<&mut VecDeque> { + self.unscheduled.get_mut(&priority) + } + + fn add(&mut self, job: ExecuteJob, priority: PvfExecKind) { + self.unscheduled.entry(priority).or_default().push_back(job); + } + + fn has_pending(&self, priority: &PvfExecKind) -> bool { + !self.unscheduled.get(priority).unwrap_or(&VecDeque::new()).is_empty() + } + + fn priority_allocation_threshold(priority: &PvfExecKind) -> Option { + Self::PRIORITY_ALLOCATION_THRESHOLDS.iter().find_map(|&(p, value)| { + if p == *priority { + Some(value) + } else { + None + } + }) + } + + /// Checks if a given priority has reached its allocated threshold + /// The thresholds are defined in `PRIORITY_ALLOCATION_THRESHOLDS`. + fn has_reached_threshold(&self, priority: &PvfExecKind) -> bool { + let Some(threshold) = Self::priority_allocation_threshold(priority) else { return false }; + let Some(count) = self.counter.get(&priority) else { return false }; + // Every time we iterate by lower level priorities + let total_scheduled_at_priority_or_lower: usize = self + .counter + .iter() + .filter_map(|(p, c)| if *p >= *priority { Some(c) } else { None }) + .sum(); + if total_scheduled_at_priority_or_lower == 0 { + return false + } + + let has_reached_threshold = count * 100 / total_scheduled_at_priority_or_lower >= threshold; + + gum::debug!( + target: LOG_TARGET, + ?priority, + ?count, + ?total_scheduled_at_priority_or_lower, + "Execution priority has {}reached threshold: {}/{}%", + if has_reached_threshold {""} else {"not "}, + count * 100 / total_scheduled_at_priority_or_lower, + threshold + ); + + has_reached_threshold + } + + fn mark_scheduled(&mut self, priority: PvfExecKind) { + *self.counter.entry(priority).or_default() += 1; + + if self.counter.values().sum::() >= Self::SCHEDULING_WINDOW_SIZE { + self.reset_counter(); + } + } + + fn reset_counter(&mut self) { + self.counter = PvfExecKind::iter().map(|kind| (kind, 0)).collect(); + } +} + +#[cfg(test)] +mod tests { + use polkadot_node_primitives::BlockData; + use sp_core::H256; + + use super::*; + use crate::testing::artifact_id; + use std::time::Duration; + + fn create_execution_job() -> ExecuteJob { + let (result_tx, _result_rx) = oneshot::channel(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); + ExecuteJob { + artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() }, + exec_timeout: Duration::from_secs(10), + pvd, + pov, + executor_params: ExecutorParams::default(), + result_tx, + waiting_since: Instant::now(), + } + } + + #[test] + fn test_unscheduled_add() { + let mut unscheduled = Unscheduled::new(); + + PvfExecKind::iter().for_each(|priority| { + unscheduled.add(create_execution_job(), priority); + }); + + PvfExecKind::iter().for_each(|priority| { + let queue = unscheduled.unscheduled.get(&priority).unwrap(); + assert_eq!(queue.len(), 1); + }); + } + + #[test] + fn test_unscheduled_priority_distribution() { + use PvfExecKind::*; + + let mut priorities = vec![]; + + let mut unscheduled = Unscheduled::new(); + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + unscheduled.add(create_execution_job(), Dispute); + unscheduled.add(create_execution_job(), Approval); + unscheduled.add(create_execution_job(), BackingSystemParas); + unscheduled.add(create_execution_job(), Backing); + } + + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + let priority = unscheduled.select_next_priority(); + priorities.push(priority); + unscheduled.mark_scheduled(priority); + } + + assert_eq!(priorities.iter().filter(|v| **v == Dispute).count(), 8); + assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 3); + assert_eq!(priorities.iter().filter(|v| **v == BackingSystemParas).count(), 1); + } + + #[test] + fn test_unscheduled_priority_distribution_without_backing_system_paras() { + use PvfExecKind::*; + + let mut priorities = vec![]; + + let mut unscheduled = Unscheduled::new(); + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + unscheduled.add(create_execution_job(), Dispute); + unscheduled.add(create_execution_job(), Approval); + unscheduled.add(create_execution_job(), Backing); + } + + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + let priority = unscheduled.select_next_priority(); + priorities.push(priority); + unscheduled.mark_scheduled(priority); + } + + assert_eq!(priorities.iter().filter(|v| **v == Dispute).count(), 8); + assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 3); + assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1); + } + + #[test] + fn test_unscheduled_priority_distribution_without_disputes() { + use PvfExecKind::*; + + let mut priorities = vec![]; + + let mut unscheduled = Unscheduled::new(); + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + unscheduled.add(create_execution_job(), Approval); + unscheduled.add(create_execution_job(), BackingSystemParas); + unscheduled.add(create_execution_job(), Backing); + } + + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + let priority = unscheduled.select_next_priority(); + priorities.push(priority); + unscheduled.mark_scheduled(priority); + } + + assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 9); + assert_eq!(priorities.iter().filter(|v| **v == BackingSystemParas).count(), 2); + assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1); + } + + #[test] + fn test_unscheduled_priority_distribution_without_disputes_and_only_one_backing() { + use PvfExecKind::*; + + let mut priorities = vec![]; + + let mut unscheduled = Unscheduled::new(); + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + unscheduled.add(create_execution_job(), Approval); + } + unscheduled.add(create_execution_job(), Backing); + + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + let priority = unscheduled.select_next_priority(); + priorities.push(priority); + unscheduled.mark_scheduled(priority); + } + + assert_eq!(priorities.iter().filter(|v| **v == Approval).count(), 11); + assert_eq!(priorities.iter().filter(|v| **v == Backing).count(), 1); + } + + #[test] + fn test_unscheduled_does_not_postpone_backing() { + use PvfExecKind::*; + + let mut priorities = vec![]; + + let mut unscheduled = Unscheduled::new(); + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + unscheduled.add(create_execution_job(), Approval); + } + unscheduled.add(create_execution_job(), Backing); + + for _ in 0..Unscheduled::SCHEDULING_WINDOW_SIZE { + let priority = unscheduled.select_next_priority(); + priorities.push(priority); + unscheduled.mark_scheduled(priority); + } + + assert_eq!(&priorities[..4], &[Approval, Backing, Approval, Approval]); + } +} diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 22943a06c43c..37cd6fcbf74a 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -37,7 +37,7 @@ use polkadot_node_core_pvf_common::{ pvf::PvfPrepData, }; use polkadot_node_primitives::PoV; -use polkadot_node_subsystem::{SubsystemError, SubsystemResult}; +use polkadot_node_subsystem::{messages::PvfExecKind, SubsystemError, SubsystemResult}; use polkadot_parachain_primitives::primitives::ValidationResult; use polkadot_primitives::PersistedValidationData; use std::{ @@ -114,6 +114,7 @@ impl ValidationHost { pvd: Arc, pov: Arc, priority: Priority, + exec_kind: PvfExecKind, result_tx: ResultSender, ) -> Result<(), String> { self.to_host_tx @@ -123,6 +124,7 @@ impl ValidationHost { pvd, pov, priority, + exec_kind, result_tx, })) .await @@ -155,6 +157,7 @@ struct ExecutePvfInputs { pvd: Arc, pov: Arc, priority: Priority, + exec_kind: PvfExecKind, result_tx: ResultSender, } @@ -545,7 +548,7 @@ async fn handle_execute_pvf( awaiting_prepare: &mut AwaitingPrepare, inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { - let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, result_tx } = inputs; + let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, exec_kind, result_tx } = inputs; let artifact_id = ArtifactId::from_pvf_prep_data(&pvf); let executor_params = (*pvf.executor_params()).clone(); @@ -567,6 +570,7 @@ async fn handle_execute_pvf( pvd, pov, executor_params, + exec_kind, result_tx, }, }, @@ -597,6 +601,7 @@ async fn handle_execute_pvf( pvd, pov, executor_params, + exec_kind, result_tx, }, ) @@ -606,7 +611,14 @@ async fn handle_execute_pvf( ArtifactState::Preparing { .. } => { awaiting_prepare.add( artifact_id, - PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx }, + PendingExecutionRequest { + exec_timeout, + pvd, + pov, + executor_params, + result_tx, + exec_kind, + }, ); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { @@ -638,6 +650,7 @@ async fn handle_execute_pvf( pvd, pov, executor_params, + exec_kind, result_tx, }, ) @@ -657,7 +670,14 @@ async fn handle_execute_pvf( pvf, priority, artifact_id, - PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx }, + PendingExecutionRequest { + exec_timeout, + pvd, + pov, + executor_params, + result_tx, + exec_kind, + }, ) .await?; } @@ -779,7 +799,7 @@ async fn handle_prepare_done( // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); - for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } in + for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx, exec_kind } in pending_requests { if result_tx.is_canceled() { @@ -805,6 +825,7 @@ async fn handle_prepare_done( pvd, pov, executor_params, + exec_kind, result_tx, }, }, @@ -1234,6 +1255,7 @@ pub(crate) mod tests { pvd.clone(), pov1.clone(), Priority::Normal, + PvfExecKind::Backing, result_tx, ) .await @@ -1246,6 +1268,7 @@ pub(crate) mod tests { pvd.clone(), pov1, Priority::Critical, + PvfExecKind::Backing, result_tx, ) .await @@ -1258,6 +1281,7 @@ pub(crate) mod tests { pvd, pov2, Priority::Normal, + PvfExecKind::Backing, result_tx, ) .await @@ -1407,6 +1431,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, + PvfExecKind::Backing, result_tx, ) .await @@ -1455,6 +1480,7 @@ pub(crate) mod tests { pvd, pov, Priority::Critical, + PvfExecKind::Backing, result_tx, ) .await @@ -1565,6 +1591,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, + PvfExecKind::Backing, result_tx, ) .await @@ -1596,6 +1623,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, + PvfExecKind::Backing, result_tx_2, ) .await @@ -1619,6 +1647,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, + PvfExecKind::Backing, result_tx_3, ) .await @@ -1677,6 +1706,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, + PvfExecKind::Backing, result_tx, ) .await @@ -1708,6 +1738,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, + PvfExecKind::Backing, result_tx_2, ) .await @@ -1731,6 +1762,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, + PvfExecKind::Backing, result_tx_3, ) .await @@ -1805,6 +1837,7 @@ pub(crate) mod tests { pvd, pov, Priority::Normal, + PvfExecKind::Backing, result_tx, ) .await diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs index c59cab464180..745f2de99e58 100644 --- a/polkadot/node/core/pvf/src/metrics.rs +++ b/polkadot/node/core/pvf/src/metrics.rs @@ -18,6 +18,7 @@ use polkadot_node_core_pvf_common::prepare::MemoryStats; use polkadot_node_metrics::metrics::{self, prometheus}; +use polkadot_node_subsystem::messages::PvfExecKind; /// Validation host metrics. #[derive(Default, Clone)] @@ -120,6 +121,13 @@ impl Metrics { .observe(pov_size as f64); } } + + /// When preparation pipeline concluded working on an item. + pub(crate) fn on_execute_kind(&self, kind: PvfExecKind) { + if let Some(metrics) = &self.0 { + metrics.exec_kind_selected.with_label_values(&[kind.as_str()]).inc(); + } + } } #[derive(Clone)] @@ -146,6 +154,7 @@ struct MetricsInner { preparation_peak_tracked_allocation: prometheus::Histogram, pov_size: prometheus::HistogramVec, code_size: prometheus::Histogram, + exec_kind_selected: prometheus::CounterVec, } impl metrics::Metrics for Metrics { @@ -369,6 +378,16 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + exec_kind_selected: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "polkadot_pvf_exec_kind_selected", + "The total number of selected execute kinds", + ), + &["priority"], + )?, + registry, + )?, }; Ok(Metrics(Some(inner))) } diff --git a/polkadot/node/core/pvf/src/priority.rs b/polkadot/node/core/pvf/src/priority.rs index 0d18d4b484ca..7aaeacf36220 100644 --- a/polkadot/node/core/pvf/src/priority.rs +++ b/polkadot/node/core/pvf/src/priority.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use polkadot_node_subsystem::messages::PvfExecKind; + /// A priority assigned to preparation of a PVF. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum Priority { @@ -35,3 +37,14 @@ impl Priority { self == Priority::Critical } } + +impl From for Priority { + fn from(priority: PvfExecKind) -> Self { + match priority { + PvfExecKind::Dispute => Priority::Critical, + PvfExecKind::Approval => Priority::Critical, + PvfExecKind::BackingSystemParas => Priority::Normal, + PvfExecKind::Backing => Priority::Normal, + } + } +} diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index a4a085318957..4cbc6fb04a8e 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -25,9 +25,11 @@ use polkadot_node_core_pvf::{ ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT}; +use polkadot_node_subsystem::messages::PvfExecKind; use polkadot_parachain_primitives::primitives::{BlockData, ValidationResult}; use polkadot_primitives::{ - ExecutorParam, ExecutorParams, PersistedValidationData, PvfExecKind, PvfPrepKind, + ExecutorParam, ExecutorParams, PersistedValidationData, PvfExecKind as RuntimePvfExecKind, + PvfPrepKind, }; use sp_core::H256; @@ -123,6 +125,7 @@ impl TestHost { Arc::new(pvd), Arc::new(pov), polkadot_node_core_pvf::Priority::Normal, + PvfExecKind::Backing, result_tx, ) .await @@ -580,8 +583,9 @@ async fn artifact_does_not_reprepare_on_non_meaningful_exec_parameter_change() { let cache_dir = host.cache_dir.path(); let set1 = ExecutorParams::default(); - let set2 = - ExecutorParams::from(&[ExecutorParam::PvfExecTimeout(PvfExecKind::Backing, 2500)][..]); + let set2 = ExecutorParams::from( + &[ExecutorParam::PvfExecTimeout(RuntimePvfExecKind::Backing, 2500)][..], + ); let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), set1) diff --git a/polkadot/node/malus/src/variants/common.rs b/polkadot/node/malus/src/variants/common.rs index 50a5af63db44..66926f48c5e7 100644 --- a/polkadot/node/malus/src/variants/common.rs +++ b/polkadot/node/malus/src/variants/common.rs @@ -241,7 +241,7 @@ where }, } => { match self.fake_validation { - x if x.misbehaves_valid() && x.should_misbehave(exec_kind) => { + x if x.misbehaves_valid() && x.should_misbehave(exec_kind.into()) => { // Behave normally if the `PoV` is not known to be malicious. if pov.block_data.0.as_slice() != MALICIOUS_POV { return Some(FromOrchestra::Communication { @@ -296,7 +296,7 @@ where }, } }, - x if x.misbehaves_invalid() && x.should_misbehave(exec_kind) => { + x if x.misbehaves_invalid() && x.should_misbehave(exec_kind.into()) => { // Set the validation result to invalid with probability `p` and trigger a // dispute let behave_maliciously = self.distribution.sample(&mut rand::thread_rng()); diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index c2cb1817312c..807e7405ff1b 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -24,14 +24,14 @@ use orchestra::async_trait; use std::time::Duration; use polkadot_node_primitives::{BlockData, PoV}; -use polkadot_node_subsystem_types::messages::CandidateValidationMessage; +use polkadot_node_subsystem_types::messages::{CandidateValidationMessage, PvfExecKind}; use polkadot_overseer::{ self as overseer, dummy::dummy_overseer_builder, gen::{FromOrchestra, SpawnedSubsystem}, HeadSupportsParachains, SubsystemError, }; -use polkadot_primitives::{CandidateReceipt, Hash, PersistedValidationData, PvfExecKind}; +use polkadot_primitives::{CandidateReceipt, Hash, PersistedValidationData}; use polkadot_primitives_test_helpers::{ dummy_candidate_descriptor, dummy_hash, dummy_validation_code, }; diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs index 47c26186b518..46864a482e2a 100644 --- a/polkadot/node/overseer/src/tests.rs +++ b/polkadot/node/overseer/src/tests.rs @@ -25,11 +25,11 @@ use polkadot_node_primitives::{ }; use polkadot_node_subsystem_test_helpers::mock::{dummy_unpin_handle, new_leaf}; use polkadot_node_subsystem_types::messages::{ - NetworkBridgeEvent, ReportPeerMessage, RuntimeApiRequest, + NetworkBridgeEvent, PvfExecKind, ReportPeerMessage, RuntimeApiRequest, }; use polkadot_primitives::{ CandidateHash, CandidateReceipt, CollatorPair, Id as ParaId, InvalidDisputeStatementKind, - PersistedValidationData, PvfExecKind, SessionIndex, ValidDisputeStatementKind, ValidatorIndex, + PersistedValidationData, SessionIndex, ValidDisputeStatementKind, ValidatorIndex, }; use polkadot_primitives_test_helpers::{ dummy_candidate_descriptor, dummy_candidate_receipt, dummy_hash, dummy_validation_code, diff --git a/polkadot/node/subsystem-types/Cargo.toml b/polkadot/node/subsystem-types/Cargo.toml index b5686ec96be1..b8bad8f8a295 100644 --- a/polkadot/node/subsystem-types/Cargo.toml +++ b/polkadot/node/subsystem-types/Cargo.toml @@ -32,3 +32,4 @@ prometheus-endpoint = { workspace = true, default-features = true } thiserror = { workspace = true } async-trait = { workspace = true } bitvec = { features = ["alloc"], workspace = true } +strum = { features = ["derive"], workspace = true, default-features = true } diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index a520e85f52a9..0017adb45568 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -24,6 +24,7 @@ use futures::channel::oneshot; use sc_network::{Multiaddr, ReputationChange}; +use strum::EnumIter; use thiserror::Error; pub use sc_network::IfDisconnected; @@ -47,9 +48,10 @@ use polkadot_primitives::{ CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, DisputeState, ExecutorParams, GroupIndex, GroupRotationInfo, Hash, HeadData, Header as BlockHeader, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, - NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, PvfExecKind, - SessionIndex, SessionInfo, SignedAvailabilityBitfield, SignedAvailabilityBitfields, - ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, + PvfExecKind as RuntimePvfExecKind, SessionIndex, SessionInfo, SignedAvailabilityBitfield, + SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, + ValidatorSignature, }; use polkadot_statement_table::v2::Misbehavior; use std::{ @@ -182,6 +184,45 @@ pub enum CandidateValidationMessage { }, } +/// Extends primitives::PvfExecKind, which is a runtime parameter we don't want to change, +/// to separate and prioritize execution jobs by request type. +/// The order is important, because we iterate through the values and assume it is going from higher +/// to lowest priority. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, EnumIter)] +pub enum PvfExecKind { + /// For dispute requests + Dispute, + /// For approval requests + Approval, + /// For backing requests from system parachains. + BackingSystemParas, + /// For backing requests. + Backing, +} + +impl PvfExecKind { + /// Converts priority level to &str + pub fn as_str(&self) -> &str { + match *self { + Self::Dispute => "dispute", + Self::Approval => "approval", + Self::BackingSystemParas => "backing_system_paras", + Self::Backing => "backing", + } + } +} + +impl From for RuntimePvfExecKind { + fn from(exec: PvfExecKind) -> Self { + match exec { + PvfExecKind::Dispute => RuntimePvfExecKind::Approval, + PvfExecKind::Approval => RuntimePvfExecKind::Approval, + PvfExecKind::BackingSystemParas => RuntimePvfExecKind::Backing, + PvfExecKind::Backing => RuntimePvfExecKind::Backing, + } + } +} + /// Messages received by the Collator Protocol subsystem. #[derive(Debug, derive_more::From)] pub enum CollatorProtocolMessage { diff --git a/prdoc/pr_4837.prdoc b/prdoc/pr_4837.prdoc new file mode 100644 index 000000000000..55c12cc92a1c --- /dev/null +++ b/prdoc/pr_4837.prdoc @@ -0,0 +1,26 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Add PVF execution priority + +doc: + - audience: Node Dev + description: | + The new logic optimizes the distribution of execution jobs for disputes, approvals, and backings. + The main goal is to create back pressure for backing in the presence of disputes or numerous approval jobs. + +crates: + - name: polkadot-node-core-pvf + bump: major + - name: polkadot-overseer + bump: patch + - name: polkadot-node-subsystem-types + bump: patch + - name: polkadot-node-core-approval-voting + bump: patch + - name: polkadot-node-core-backing + bump: patch + - name: polkadot-node-core-candidate-validation + bump: patch + - name: polkadot-node-core-dispute-coordinator + bump: patch