From 9a17c4e9bf44bc431e48cdb145ebedc7d91c0aa7 Mon Sep 17 00:00:00 2001 From: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com> Date: Wed, 15 Feb 2023 12:26:09 +0100 Subject: [PATCH] Executor Environment parameterization (#6161) * Re-apply changes without Diener, rebase to the lastest master * Cache pruning * Bit-pack InstantiationStrategy * Move ExecutorParams version inside the structure itself * Rework runtime API and executor parameters storage * Pass executor parameters through backing subsystem * Update Cargo.lock * Introduce `ExecutorParams` to approval voting subsys * Introduce `ExecutorParams` to dispute coordinator * `cargo fmt` * Simplify requests from backing subsys * Fix tests * Replace manual config cloning with `.clone()` * Move constants to module * Parametrize executor performing PVF pre-check * Fix Malus * Fix test runtime * Introduce session executor params as a constant defined by session info pallet * Use Parity SCALE codec instead of hand-crafted binary encoding * Get rid of constants; Add docs * Get rid of constants * Minor typo * Fix Malus after rebase * `cargo fmt` * Use transparent SCALE encoding instead of explicit * Clean up * Get rid of relay parent to session index mapping * Join environment type and version in a single enum element * Use default execution parameters if running an old runtime * `unwrap()` -> `expect()` * Correct API version * Constants are back in town * Use constants for execution environment types * Artifact separation, first try * Get rid of explicit version * PVF execution queue worker separation * Worker handshake * Global renaming * Minor fixes resolving discussions * Two-stage requesting of executor params to make use of runtime API cache * Proper error handling in pvf-checker * Executor params storage bootstrapping * Propagate migration to v3 network runtimes * Fix storage versioning * Ensure `ExecutorParams` serialization determinism; Add comments * Rename constants to make things a bit more deterministic Get rid of stale code * Tidy up a structure of active PVFs * Minor formatting * Fix comment * Add try-runtime hooks * Add storage version write on upgrade Co-authored-by: Andronik * Add pre- and post-upgrade assertions * Require to specify environment type; Remove redundant `impl`s * Add `ExecutorParamHash` creation from `H256` * Fix candidate validation subsys tests * Return splittable error from executor params request fn * Revert "Return splittable error from executor params request fn" This reverts commit a85038da0f9395096fdfd57cb6e60f74fec0258a. * Decompose approval voting metrics * Use more relevant errors * Minor formatting fix * Assert a valid environment type instead of checking * Fix `try-runtime` hooks * After-merge fixes * Add migration logs * Remove dead code * Fix tests * Fix tests * Back to the strongly typed implementation * Promote strong types to executor interface * Remove stale comment * Move executor params to `SessionInfo`: primitives and runtime * Move executor params to `SessionInfo`: node * Try to bump primitives and API version * Get rid of `MallocSizeOf` * Bump target API version to v4 * Make use of session index already in place * Back to v3 * Fix all the tests * Add migrations to all the runtimes * Make use of existing `SessionInfo` in approval voting subsys * Rename `TARGET` -> `LOG_TARGET` * Bump all the primitives to v3 * Fix Rococo ParachainHost API version * Use `RollingSessionWindow` to acquire `ExecutorParams` in disputes * Fix nits from discussions; add comments * Re-evaluate queue logic * Rework job assignment in execution queue * Add documentation * Use `RuntimeInfo` to obtain `SessionInfo` (with blackjack and caching) * Couple `Pvf` with `ExecutorParams` wherever possible * Put members of `PvfWithExecutorParams` under `Arc` for cheap cloning * Fix comment * Fix CI tests * Fix clippy warnings * Address nits from discussions * Add a placeholder for raw data * Fix non exhaustive match * Remove redundant reexports and fix imports * Keep only necessary semantic features, as discussed * Rework `RuntimeInfo` to support mock implementation for tests * Remove unneeded bound * `cargo fmt` * Revert "Remove unneeded bound" This reverts commit 3bdfc68202a371bb4cf2978e5224c9d05d641c24. * Fix PVF host tests * Fix PVF checker tests * Fix overseer declarations * Simplify tests * `MAX_KEEP_WAITING` timeout based on `BACKGING_EXECUTION_TIMEOUT` * Add a unit test for varying executor parameters * Minor fixes from discussions * Add prechecking max. memory parameter (see paritytech/srlabs_findings#110) * Fix and improve a test * Remove `ExecutionEnvironment` and `RawData` * New primitives versioning in parachain host API * `disputes()` implementation for Kusama and Polkadot * Move `ExecutorParams` from `vstaging` to stable primitives * Move disputes from `vstaging` to stable implementation * Fix `try-runtime` * Fixes after merge * Move `ExecutorParams` to the bottom of `SessionInfo` * Revert "Move executor params to `SessionInfo`: primitives and runtime" This reverts commit 198835592041e021138f67e2fa4965652bc1af14. * Always use fresh activated live hash in pvf precheck (re-apply 029b82b7db0a0e5e4fd605e940aa748c59fdda77) * Fixing tests (broken commit) * Fix candidate validation tests * Fix PVF host test * Minor fixes * Address discussions * Restore migration * Fix `use` to only include what is needed instead of `*` * Add comment to never touch `DEFAULT_CONFIG` * Update migration to set default `ExecutorParams` for `dispute_period` sessions back * Use `earliest_stored_session` instead of calculations * Nit * Add logs * Treat any runtime error as `NotSupported` again * Always return default executor params if not available * Revert "Always return default executor params if not available" This reverts commit b58ac4482ef444c67a9852d5776550d08e312f30. * Add paritytech/substrate#9997 workaround * `cargo fmt` * Remove migration (again!) * Bump executor params to API v4 (backport from #6698) --------- Co-authored-by: Andronik --- Cargo.lock | 3 + .../benches/scaling_with_validators.rs | 2 +- node/core/candidate-validation/Cargo.toml | 2 +- node/core/candidate-validation/src/lib.rs | 105 ++++-- node/core/candidate-validation/src/tests.rs | 313 +++++++++++++----- node/core/dispute-coordinator/src/tests.rs | 2 +- node/core/pvf-checker/src/lib.rs | 2 +- node/core/pvf/Cargo.toml | 4 +- node/core/pvf/src/artifacts.rs | 34 +- node/core/pvf/src/execute/queue.rs | 187 ++++++++--- node/core/pvf/src/execute/worker.rs | 44 ++- node/core/pvf/src/executor_intf.rs | 48 ++- node/core/pvf/src/host.rs | 142 +++++--- node/core/pvf/src/lib.rs | 2 +- node/core/pvf/src/prepare/pool.rs | 19 +- node/core/pvf/src/prepare/queue.rs | 89 +++-- node/core/pvf/src/prepare/worker.rs | 33 +- node/core/pvf/src/pvf.rs | 43 ++- node/core/pvf/src/testing.rs | 6 +- node/core/pvf/src/worker_common.rs | 2 + node/core/pvf/tests/it/adder.rs | 4 + node/core/pvf/tests/it/main.rs | 61 +++- node/core/runtime-api/src/cache.rs | 29 +- node/core/runtime-api/src/lib.rs | 19 ++ .../src/variants/suggest_garbage_candidate.rs | 3 +- node/subsystem-types/src/messages.rs | 15 +- node/subsystem-types/src/runtime_client.rs | 32 +- node/subsystem-util/src/lib.rs | 53 +++ node/test/performance-test/Cargo.toml | 1 + node/test/performance-test/src/lib.rs | 4 +- primitives/src/runtime_api.rs | 47 +-- primitives/src/v2/mod.rs | 2 +- primitives/src/vstaging/executor_params.rs | 116 +++++++ primitives/src/vstaging/mod.rs | 3 + runtime/common/src/xcm_sender.rs | 2 +- runtime/parachains/src/disputes/migration.rs | 2 +- .../src/runtime_api_impl/vstaging.rs | 19 +- runtime/parachains/src/session_info.rs | 20 +- runtime/rococo/src/lib.rs | 29 +- runtime/westend/src/lib.rs | 30 +- 40 files changed, 1243 insertions(+), 330 deletions(-) create mode 100644 primitives/src/vstaging/executor_params.rs diff --git a/Cargo.lock b/Cargo.lock index 6c109b49896e..7d8c37e9728c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7019,7 +7019,9 @@ dependencies = [ "pin-project", "polkadot-core-primitives", "polkadot-node-metrics", + "polkadot-node-primitives", "polkadot-parachain", + "polkadot-primitives", "rand 0.8.5", "rayon", "sc-executor", @@ -7314,6 +7316,7 @@ dependencies = [ "polkadot-erasure-coding", "polkadot-node-core-pvf", "polkadot-node-primitives", + "polkadot-primitives", "quote", "thiserror", ] diff --git a/erasure-coding/benches/scaling_with_validators.rs b/erasure-coding/benches/scaling_with_validators.rs index c4834ecbab0f..35eff095d2f6 100644 --- a/erasure-coding/benches/scaling_with_validators.rs +++ b/erasure-coding/benches/scaling_with_validators.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use polkadot_primitives::v2::Hash; +use polkadot_primitives::Hash; use std::time::Duration; fn chunks(n_validators: usize, pov: &Vec) -> Vec> { diff --git a/node/core/candidate-validation/Cargo.toml b/node/core/candidate-validation/Cargo.toml index 267dced02b26..e46d44033bad 100644 --- a/node/core/candidate-validation/Cargo.toml +++ b/node/core/candidate-validation/Cargo.toml @@ -17,6 +17,7 @@ polkadot-primitives = { path = "../../../primitives" } polkadot-parachain = { path = "../../../parachain" } polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-metrics = { path = "../../metrics" } [target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies] @@ -27,6 +28,5 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master futures = { version = "0.3.21", features = ["thread-pool"] } assert_matches = "1.4.0" polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } -polkadot-node-subsystem-util = { path = "../../subsystem-util" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" } diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index b3c44064a922..efdbeefe13f9 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -24,8 +24,8 @@ #![warn(missing_docs)] use polkadot_node_core_pvf::{ - InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, ValidationError, - ValidationHost, + InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, + PvfWithExecutorParams, ValidationError, ValidationHost, }; use polkadot_node_primitives::{ BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, @@ -39,10 +39,11 @@ use polkadot_node_subsystem::{ overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender, }; +use polkadot_node_subsystem_util::executor_params_at_relay_parent; use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult}; use polkadot_primitives::{ - CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, OccupiedCoreAssumption, - PersistedValidationData, ValidationCode, ValidationCodeHash, + vstaging::ExecutorParams, CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, + OccupiedCoreAssumption, PersistedValidationData, ValidationCode, ValidationCodeHash, }; use parity_scale_codec::Encode; @@ -175,12 +176,14 @@ async fn run( response_sender, ) => { let bg = { + let mut sender = ctx.sender().clone(); let metrics = metrics.clone(); let validation_host = validation_host.clone(); async move { let _timer = metrics.time_validate_from_exhaustive(); let res = validate_candidate_exhaustive( + &mut sender, validation_host, persisted_validation_data, validation_code, @@ -307,18 +310,38 @@ where }, }; - let validation_code = match sp_maybe_compressed_blob::decompress( + let executor_params = + if let Ok(executor_params) = executor_params_at_relay_parent(relay_parent, sender).await { + gum::debug!( + target: LOG_TARGET, + ?relay_parent, + ?validation_code_hash, + "precheck: acquired executor params for the session: {:?}", + executor_params, + ); + executor_params + } else { + gum::warn!( + target: LOG_TARGET, + ?relay_parent, + ?validation_code_hash, + "precheck: failed to acquire executor params for the session, thus voting against.", + ); + return PreCheckOutcome::Invalid + }; + + let pvf_with_params = match sp_maybe_compressed_blob::decompress( &validation_code.0, VALIDATION_CODE_BOMB_LIMIT, ) { - Ok(code) => Pvf::from_code(code.into_owned()), + Ok(code) => PvfWithExecutorParams::new(Pvf::from_code(code.into_owned()), executor_params), Err(e) => { gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code"); return PreCheckOutcome::Invalid }, }; - match validation_backend.precheck_pvf(validation_code).await { + match validation_backend.precheck_pvf(pvf_with_params).await { Ok(_) => PreCheckOutcome::Valid, Err(prepare_err) => if prepare_err.is_deterministic() { @@ -456,6 +479,7 @@ where }; let validation_result = validate_candidate_exhaustive( + sender, validation_host, validation_data, validation_code, @@ -490,7 +514,8 @@ where validation_result } -async fn validate_candidate_exhaustive( +async fn validate_candidate_exhaustive( + sender: &mut Sender, mut validation_backend: impl ValidationBackend + Send, persisted_validation_data: PersistedValidationData, validation_code: ValidationCode, @@ -498,7 +523,10 @@ async fn validate_candidate_exhaustive( pov: Arc, timeout: Duration, metrics: &Metrics, -) -> Result { +) -> Result +where + Sender: SubsystemSender, +{ let _timer = metrics.time_validate_candidate_exhaustive(); let validation_code_hash = validation_code.hash(); @@ -554,8 +582,34 @@ async fn validate_candidate_exhaustive( relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root, }; + let executor_params = if let Ok(executor_params) = + executor_params_at_relay_parent(candidate_receipt.descriptor.relay_parent, sender).await + { + gum::debug!( + target: LOG_TARGET, + ?validation_code_hash, + ?para_id, + "Acquired executor params for the session: {:?}", + executor_params, + ); + executor_params + } else { + gum::warn!( + target: LOG_TARGET, + ?validation_code_hash, + ?para_id, + "Failed to acquire executor params for the session", + ); + return Ok(ValidationResult::Invalid(InvalidCandidate::BadParent)) + }; + let result = validation_backend - .validate_candidate_with_retry(raw_validation_code.to_vec(), timeout, params) + .validate_candidate_with_retry( + raw_validation_code.to_vec(), + timeout, + params, + executor_params, + ) .await; if let Err(ref error) = result { @@ -613,7 +667,7 @@ trait ValidationBackend { /// Tries executing a PVF a single time (no retries). async fn validate_candidate( &mut self, - pvf: Pvf, + pvf_with_params: PvfWithExecutorParams, timeout: Duration, encoded_params: Vec, ) -> Result; @@ -625,12 +679,14 @@ trait ValidationBackend { raw_validation_code: Vec, timeout: Duration, params: ValidationParams, + executor_params: ExecutorParams, ) -> Result { // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. - let pvf = Pvf::from_code(raw_validation_code); + let pvf_with_params = + PvfWithExecutorParams::new(Pvf::from_code(raw_validation_code), executor_params); let mut validation_result = - self.validate_candidate(pvf.clone(), timeout, params.encode()).await; + self.validate_candidate(pvf_with_params.clone(), timeout, params.encode()).await; // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the // assumption that the conditions that caused this error may have been transient. Note that @@ -643,19 +699,23 @@ trait ValidationBackend { gum::warn!( target: LOG_TARGET, - ?pvf, + ?pvf_with_params, "Re-trying failed candidate validation due to AmbiguousWorkerDeath." ); // Encode the params again when re-trying. We expect the retry case to be relatively // rare, and we want to avoid unconditionally cloning data. - validation_result = self.validate_candidate(pvf, timeout, params.encode()).await; + validation_result = + self.validate_candidate(pvf_with_params, timeout, params.encode()).await; } validation_result } - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result; + async fn precheck_pvf( + &mut self, + pvf_with_params: PvfWithExecutorParams, + ) -> Result; } #[async_trait] @@ -663,14 +723,16 @@ impl ValidationBackend for ValidationHost { /// Tries executing a PVF a single time (no retries). async fn validate_candidate( &mut self, - pvf: Pvf, + pvf_with_params: PvfWithExecutorParams, timeout: Duration, encoded_params: Vec, ) -> Result { let priority = polkadot_node_core_pvf::Priority::Normal; let (tx, rx) = oneshot::channel(); - if let Err(err) = self.execute_pvf(pvf, timeout, encoded_params, priority, tx).await { + if let Err(err) = + self.execute_pvf(pvf_with_params, timeout, encoded_params, priority, tx).await + { return Err(ValidationError::InternalError(format!( "cannot send pvf to the validation host: {:?}", err @@ -681,9 +743,12 @@ impl ValidationBackend for ValidationHost { .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? } - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { + async fn precheck_pvf( + &mut self, + pvf_with_params: PvfWithExecutorParams, + ) -> Result { let (tx, rx) = oneshot::channel(); - if let Err(err) = self.precheck_pvf(pvf, tx).await { + if let Err(err) = self.precheck_pvf(pvf_with_params, tx).await { // Return an IO error if there was an error communicating with the host. return Err(PrepareError::IoErr(err)) } diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index 779bf0fcca33..6bab57097dae 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -26,6 +26,37 @@ use polkadot_primitives::{HeadData, Id as ParaId, UpwardMessage}; use sp_core::testing::TaskExecutor; use sp_keyring::Sr25519Keyring; +fn test_with_executor_params, R, M>( + mut ctx_handle: test_helpers::TestSubsystemContextHandle, + test: impl FnOnce() -> T, +) -> R { + let test_fut = test(); + + let overseer = async move { + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx)) + ) => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(overseer); + let v = executor::block_on(future::join(test_fut, overseer)); + v.0 +} + #[test] fn correctly_checks_included_assumption() { let validation_data: PersistedValidationData = Default::default(); @@ -365,7 +396,7 @@ impl MockValidateCandidateBackend { impl ValidationBackend for MockValidateCandidateBackend { async fn validate_candidate( &mut self, - _pvf: Pvf, + _pvf_with_params: PvfWithExecutorParams, _timeout: Duration, _encoded_params: Vec, ) -> Result { @@ -377,7 +408,10 @@ impl ValidationBackend for MockValidateCandidateBackend { result } - async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result { + async fn precheck_pvf( + &mut self, + _pvf_with_params: PvfWithExecutorParams, + ) -> Result { unreachable!() } } @@ -429,15 +463,23 @@ fn candidate_validation_ok_is_ok() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data.clone(), - validation_code, - candidate_receipt, - Arc::new(pov), - Duration::from_secs(0), - &Default::default(), - )) + let pool = TaskExecutor::new(); + let (mut ctx, ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let metrics = Metrics::default(); + + let v = test_with_executor_params(ctx_handle, || { + validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + validation_data.clone(), + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + }) .unwrap(); assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => { @@ -478,20 +520,27 @@ fn candidate_validation_bad_return_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Err( - ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), - )), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - Duration::from_secs(0), - &Default::default(), - )) - .unwrap(); + let pool = TaskExecutor::new(); + let (mut ctx, ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let metrics = Metrics::default(); + + let v = test_with_executor_params(ctx_handle, || { + validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result(Err( + ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), + )), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + }); - assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::Timeout)); + assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))); } #[test] @@ -541,18 +590,26 @@ fn candidate_validation_one_ambiguous_error_is_valid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result_list(vec![ - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), - Ok(validation_result), - ]), - validation_data.clone(), - validation_code, - candidate_receipt, - Arc::new(pov), - Duration::from_secs(0), - &Default::default(), - )) + let pool = TaskExecutor::new(); + let (mut ctx, ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let metrics = Metrics::default(); + + let v = test_with_executor_params(ctx_handle, || { + validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Ok(validation_result), + ]), + validation_data.clone(), + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + }) .unwrap(); assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => { @@ -593,18 +650,26 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result_list(vec![ - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), - ]), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - Duration::from_secs(0), - &Default::default(), - )) + let pool = TaskExecutor::new(); + let (mut ctx, ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let metrics = Metrics::default(); + + let v = test_with_executor_params(ctx_handle, || { + validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + ]), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + }) .unwrap(); assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_))); @@ -638,17 +703,25 @@ fn candidate_validation_timeout_is_internal_error() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Err( - ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), - )), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - Duration::from_secs(0), - &Default::default(), - )); + let pool = TaskExecutor::new(); + let (mut ctx, ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let metrics = Metrics::default(); + + let v = test_with_executor_params(ctx_handle, || { + validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result(Err( + ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), + )), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + }); assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))); } @@ -684,15 +757,23 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() { hrmp_watermark: 12345, }; - let result = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - Duration::from_secs(0), - &Default::default(), - )) + let pool = TaskExecutor::new(); + let (mut ctx, ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let metrics = Metrics::default(); + + let result = test_with_executor_params(ctx_handle, || { + validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + }) .unwrap(); // Ensure `post validation` check on the commitments hash works as expected. @@ -727,7 +808,12 @@ fn candidate_validation_code_mismatch_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; + let pool = TaskExecutor::new(); + let (mut ctx, _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Err( ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), )), @@ -785,15 +871,23 @@ fn compressed_code_works() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - Duration::from_secs(0), - &Default::default(), - )); + let pool = TaskExecutor::new(); + let (mut ctx, ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let metrics = Metrics::default(); + + let v = test_with_executor_params(ctx_handle, || { + validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + }); assert_matches!(v, Ok(ValidationResult::Valid(_, _))); } @@ -832,7 +926,12 @@ fn code_decompression_failure_is_error() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; + let pool = TaskExecutor::new(); + let (mut ctx, _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, validation_code, @@ -880,7 +979,12 @@ fn pov_decompression_failure_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; + let pool = TaskExecutor::new(); + let (mut ctx, _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, validation_code, @@ -907,14 +1011,17 @@ impl MockPreCheckBackend { impl ValidationBackend for MockPreCheckBackend { async fn validate_candidate( &mut self, - _pvf: Pvf, + _pvf_with_params: PvfWithExecutorParams, _timeout: Duration, _encoded_params: Vec, ) -> Result { unreachable!() } - async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result { + async fn precheck_pvf( + &mut self, + _pvf_with_params: PvfWithExecutorParams, + ) -> Result { self.result.clone() } } @@ -953,6 +1060,22 @@ fn precheck_works() { let _ = tx.send(Ok(Some(validation_code.clone()))); } ); + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx)) + ) => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); assert_matches!(check_result.await, PreCheckOutcome::Valid); }; @@ -999,6 +1122,22 @@ fn precheck_invalid_pvf_blob_compression() { let _ = tx.send(Ok(Some(validation_code.clone()))); } ); + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx)) + ) => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); assert_matches!(check_result.await, PreCheckOutcome::Invalid); }; @@ -1041,6 +1180,22 @@ fn precheck_properly_classifies_outcomes() { let _ = tx.send(Ok(Some(validation_code.clone()))); } ); + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx)) + ) => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); assert_eq!(check_result.await, precheck_outcome); }; diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 6725d0061b23..7d7243243ddf 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -3339,7 +3339,7 @@ fn informs_chain_selection_when_dispute_concluded_against() { .await; let supermajority_threshold = - polkadot_primitives::v2::supermajority_threshold(test_state.validators.len()); + polkadot_primitives::supermajority_threshold(test_state.validators.len()); let (valid_vote, invalid_vote) = generate_opposing_votes_pair( &test_state, diff --git a/node/core/pvf-checker/src/lib.rs b/node/core/pvf-checker/src/lib.rs index 7e961dcf4b88..4278b74e0b15 100644 --- a/node/core/pvf-checker/src/lib.rs +++ b/node/core/pvf-checker/src/lib.rs @@ -297,7 +297,7 @@ async fn handle_leaves_update( metrics.on_pvf_observed(outcome.newcomers.len()); metrics.on_pvf_left(outcome.left_num); for newcomer in outcome.newcomers { - initiate_precheck(state, sender, recent_block_hash, newcomer, metrics).await; + initiate_precheck(state, sender, activated.hash, newcomer, metrics).await; } if let Some((new_session_index, credentials)) = new_session_index { diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index f19663bab98e..a810c3887c96 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -27,8 +27,10 @@ parity-scale-codec = { version = "3.3.0", default-features = false, features = [ polkadot-parachain = { path = "../../../parachain" } polkadot-core-primitives = { path = "../../../core-primitives" } -polkadot-node-metrics = { path = "../../metrics"} +polkadot-node-metrics = { path = "../../metrics" } +polkadot-node-primitives = { path = "../../primitives" } +polkadot-primitives = { path = "../../../primitives" } sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index d2e1e1e90878..7ddbaef38c5a 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -17,6 +17,7 @@ use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats}; use always_assert::always; use polkadot_parachain::primitives::ValidationCodeHash; +use polkadot_primitives::vstaging::ExecutorParamsHash; use std::{ collections::HashMap, path::{Path, PathBuf}, @@ -37,19 +38,19 @@ impl AsRef<[u8]> for CompiledArtifact { } } -/// Identifier of an artifact. Right now it only encodes a code hash of the PVF. But if we get to -/// multiple engine implementations the artifact ID should include the engine type as well. +/// Identifier of an artifact. Encodes a code hash of the PVF and a hash of executor parameter set. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ArtifactId { pub(crate) code_hash: ValidationCodeHash, + pub(crate) executor_params_hash: ExecutorParamsHash, } impl ArtifactId { const PREFIX: &'static str = "wasmtime_"; /// Creates a new artifact ID with the given hash. - pub fn new(code_hash: ValidationCodeHash) -> Self { - Self { code_hash } + pub fn new(code_hash: ValidationCodeHash, executor_params_hash: ExecutorParamsHash) -> Self { + Self { code_hash, executor_params_hash } } /// Tries to recover the artifact id from the given file name. @@ -59,14 +60,18 @@ impl ArtifactId { use std::str::FromStr as _; let file_name = file_name.strip_prefix(Self::PREFIX)?; - let code_hash = Hash::from_str(file_name).ok()?.into(); + let (code_hash_str, executor_params_hash_str) = file_name.split_once('_')?; + let code_hash = Hash::from_str(code_hash_str).ok()?.into(); + let executor_params_hash = + ExecutorParamsHash::from_hash(Hash::from_str(executor_params_hash_str).ok()?); - Some(Self { code_hash }) + Some(Self { code_hash, executor_params_hash }) } /// Returns the expected path to this artifact given the root of the cache. pub fn path(&self, cache_path: &Path) -> PathBuf { - let file_name = format!("{}{:#x}", Self::PREFIX, self.code_hash); + let file_name = + format!("{}{:#x}_{:#x}", Self::PREFIX, self.code_hash, self.executor_params_hash); cache_path.join(file_name) } } @@ -214,6 +219,7 @@ impl Artifacts { #[cfg(test)] mod tests { use super::{ArtifactId, Artifacts}; + use polkadot_primitives::vstaging::ExecutorParamsHash; use sp_core::H256; use std::{path::Path, str::FromStr}; @@ -224,13 +230,16 @@ mod tests { assert_eq!( ArtifactId::from_file_name( - "wasmtime_0x0022800000000000000000000000000000000000000000000000000000000000" + "wasmtime_0x0022800000000000000000000000000000000000000000000000000000000000_0x0033900000000000000000000000000000000000000000000000000000000000" ), Some(ArtifactId::new( hex_literal::hex![ "0022800000000000000000000000000000000000000000000000000000000000" ] - .into() + .into(), + ExecutorParamsHash::from_hash(sp_core::H256(hex_literal::hex![ + "0033900000000000000000000000000000000000000000000000000000000000" + ])), )), ); } @@ -240,13 +249,12 @@ mod tests { let path = Path::new("/test"); let hash = H256::from_str("1234567890123456789012345678901234567890123456789012345678901234") - .unwrap() - .into(); + .unwrap(); assert_eq!( - ArtifactId::new(hash).path(path).to_str(), + ArtifactId::new(hash.into(), ExecutorParamsHash::from_hash(hash)).path(path).to_str(), Some( - "/test/wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234" + "/test/wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234_0x1234567890123456789012345678901234567890123456789012345678901234" ), ); } diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index c20099b0e798..9f5b0451dc6c 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -30,8 +30,23 @@ use futures::{ stream::{FuturesUnordered, StreamExt as _}, Future, FutureExt, }; +use polkadot_node_primitives::BACKING_EXECUTION_TIMEOUT; +use polkadot_primitives::vstaging::{ExecutorParams, ExecutorParamsHash}; use slotmap::HopSlotMap; -use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration}; +use std::{ + collections::VecDeque, + fmt, + path::PathBuf, + time::{Duration, Instant}, +}; + +/// The amount of time a job for which the queue does not have a compatible worker may wait in the +/// queue. After that time passes, the queue will kill the first worker which becomes idle to +/// re-spawn a new worker to execute the job immediately. +/// To make any sense and not to break things, the value should be greater than minimal execution +/// timeout in use, and less than the block time. +const MAX_KEEP_WAITING: Duration = + Duration::from_millis(BACKING_EXECUTION_TIMEOUT.as_millis() as u64 * 2); slotmap::new_key_type! { struct Worker; } @@ -41,6 +56,7 @@ pub enum ToQueue { artifact: ArtifactPathId, execution_timeout: Duration, params: Vec, + executor_params: ExecutorParams, result_tx: ResultSender, }, } @@ -49,12 +65,15 @@ struct ExecuteJob { artifact: ArtifactPathId, execution_timeout: Duration, params: Vec, + executor_params: ExecutorParams, result_tx: ResultSender, + waiting_since: Instant, } struct WorkerData { idle: Option, handle: WorkerHandle, + executor_params_hash: ExecutorParamsHash, } impl fmt::Debug for WorkerData { @@ -79,7 +98,17 @@ impl Workers { self.spawn_inflight + self.running.len() < self.capacity } - fn find_available(&self) -> Option { + fn find_available(&self, executor_params_hash: ExecutorParamsHash) -> Option { + self.running.iter().find_map(|d| { + if d.1.idle.is_some() && d.1.executor_params_hash == executor_params_hash { + Some(d.0) + } else { + None + } + }) + } + + fn find_idle(&self) -> Option { self.running .iter() .find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None }) @@ -94,7 +123,7 @@ impl Workers { } enum QueueEvent { - Spawn(IdleWorker, WorkerHandle), + Spawn(IdleWorker, WorkerHandle, ExecuteJob), StartWork(Worker, Outcome, ArtifactId, ResultSender), } @@ -154,6 +183,66 @@ impl Queue { purge_dead(&self.metrics, &mut self.workers).await; } } + + /// Tries to assign a job in the queue to a worker. If an idle worker is provided, it does its + /// best to find a job with a compatible execution environment unless there are jobs in the + /// queue waiting too long. In that case, it kills an existing idle worker and spawns a new + /// one. It may spawn an additional worker if that is affordable. + /// If all the workers are busy or the queue is empty, it does nothing. + /// Should be called every time a new job arrives to the queue or a job finishes. + fn try_assign_next_job(&mut self, finished_worker: Option) { + // New jobs are always pushed to the tail of the queue; the one at its head is always + // the eldest one. + let eldest = if let Some(eldest) = self.queue.get(0) { eldest } else { return }; + + // By default, we're going to execute the eldest job on any worker slot available, even if + // we have to kill and re-spawn a worker + let mut worker = None; + let mut job_index = 0; + + // But if we're not pressed for time, we can try to find a better job-worker pair not + // requiring the expensive kill-spawn operation + if eldest.waiting_since.elapsed() < MAX_KEEP_WAITING { + if let Some(finished_worker) = finished_worker { + if let Some(worker_data) = self.workers.running.get(finished_worker) { + for (i, job) in self.queue.iter().enumerate() { + if worker_data.executor_params_hash == job.executor_params.hash() { + (worker, job_index) = (Some(finished_worker), i); + break + } + } + } + } + } + + if worker.is_none() { + // Try to obtain a worker for the job + worker = self.workers.find_available(self.queue[job_index].executor_params.hash()); + } + + if worker.is_none() { + if let Some(idle) = self.workers.find_idle() { + // No available workers of required type but there are some idle ones of other + // types, have to kill one and re-spawn with the correct type + if self.workers.running.remove(idle).is_some() { + self.metrics.execute_worker().on_retired(); + } + } + } + + if worker.is_none() && !self.workers.can_afford_one_more() { + // Bad luck, no worker slot can be used to execute the job + return + } + + let job = self.queue.remove(job_index).expect("Job is just checked to be in queue; qed"); + + if let Some(worker) = worker { + assign(self, worker, job); + } else { + spawn_extra_worker(self, job); + } + } } async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { @@ -172,29 +261,30 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { } fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { - let ToQueue::Enqueue { artifact, execution_timeout, params, result_tx } = to_queue; + let ToQueue::Enqueue { artifact, execution_timeout, params, executor_params, result_tx } = + to_queue; gum::debug!( target: LOG_TARGET, validation_code_hash = ?artifact.id.code_hash, "enqueueing an artifact for execution", ); queue.metrics.execute_enqueued(); - let job = ExecuteJob { artifact, execution_timeout, params, result_tx }; - - if let Some(available) = queue.workers.find_available() { - assign(queue, available, job); - } else { - if queue.workers.can_afford_one_more() { - spawn_extra_worker(queue); - } - queue.queue.push_back(job); - } + let job = ExecuteJob { + artifact, + execution_timeout, + params, + executor_params, + result_tx, + waiting_since: Instant::now(), + }; + queue.queue.push_back(job); + queue.try_assign_next_job(None); } async fn handle_mux(queue: &mut Queue, event: QueueEvent) { match event { - QueueEvent::Spawn(idle, handle) => { - handle_worker_spawned(queue, idle, handle); + QueueEvent::Spawn(idle, handle, job) => { + handle_worker_spawned(queue, idle, handle, job); }, QueueEvent::StartWork(worker, outcome, artifact_id, result_tx) => { handle_job_finish(queue, worker, outcome, artifact_id, result_tx); @@ -202,16 +292,23 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) { } } -fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) { +fn handle_worker_spawned( + queue: &mut Queue, + idle: IdleWorker, + handle: WorkerHandle, + job: ExecuteJob, +) { queue.metrics.execute_worker().on_spawned(); queue.workers.spawn_inflight -= 1; - let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle }); + let worker = queue.workers.running.insert(WorkerData { + idle: Some(idle), + handle, + executor_params_hash: job.executor_params.hash(), + }); gum::debug!(target: LOG_TARGET, ?worker, "execute worker spawned"); - if let Some(job) = queue.queue.pop_front() { - assign(queue, worker, job); - } + assign(queue, worker, job); } /// If there are pending jobs in the queue, schedules the next of them onto the just freed up @@ -280,42 +377,45 @@ fn handle_job_finish( if let Some(idle_worker) = idle_worker { if let Some(data) = queue.workers.running.get_mut(worker) { data.idle = Some(idle_worker); - - if let Some(job) = queue.queue.pop_front() { - assign(queue, worker, job); - } + return queue.try_assign_next_job(Some(worker)) } } else { // Note it's possible that the worker was purged already by `purge_dead` if queue.workers.running.remove(worker).is_some() { queue.metrics.execute_worker().on_retired(); } - - if !queue.queue.is_empty() { - // The worker has died and we still have work we have to do. Request an extra worker. - // - // That can potentially overshoot, but that should be OK. - spawn_extra_worker(queue); - } } + + queue.try_assign_next_job(None); } -fn spawn_extra_worker(queue: &mut Queue) { +fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) { queue.metrics.execute_worker().on_begin_spawn(); gum::debug!(target: LOG_TARGET, "spawning an extra worker"); queue .mux - .push(spawn_worker_task(queue.program_path.clone(), queue.spawn_timeout).boxed()); + .push(spawn_worker_task(queue.program_path.clone(), job, queue.spawn_timeout).boxed()); queue.workers.spawn_inflight += 1; } -async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> QueueEvent { +/// Spawns a new worker to execute a pre-assigned job. +/// A worker is never spawned as idle; a job to be executed by the worker has to be determined +/// beforehand. In such a way, a race condition is avoided: during the worker being spawned, +/// another job in the queue, with an incompatible execution environment, may become stale, and +/// the queue would have to kill a newly started worker and spawn another one. +/// Nevertheless, if the worker finishes executing the job, it becomes idle and may be used to execute other jobs with a compatible execution environment. +async fn spawn_worker_task( + program_path: PathBuf, + job: ExecuteJob, + spawn_timeout: Duration, +) -> QueueEvent { use futures_timer::Delay; loop { - match super::worker::spawn(&program_path, spawn_timeout).await { - Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle), + match super::worker::spawn(&program_path, job.executor_params.clone(), spawn_timeout).await + { + Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job), Err(err) => { gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err); @@ -328,7 +428,8 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu /// Ask the given worker to perform the given job. /// -/// The worker must be running and idle. +/// The worker must be running and idle. The job and the worker must share the same execution +/// environment parameter set. fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { gum::debug!( target: LOG_TARGET, @@ -337,6 +438,16 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { "assigning the execute worker", ); + debug_assert_eq!( + queue + .workers + .running + .get(worker) + .expect("caller must provide existing worker; qed") + .executor_params_hash, + job.executor_params.hash() + ); + let idle = queue.workers.claim_idle(worker).expect( "this caller must supply a worker which is idle and running; thus claim_idle cannot return None; diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 4b19d4029be5..5db6a6261cc9 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -28,7 +28,9 @@ use cpu_time::ProcessTime; use futures::{pin_mut, select_biased, FutureExt}; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; + use polkadot_parachain::primitives::ValidationResult; +use polkadot_primitives::vstaging::ExecutorParams; use std::{ path::{Path, PathBuf}, sync::{mpsc::channel, Arc}, @@ -37,13 +39,29 @@ use std::{ use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. +/// Sends a handshake message to the worker as soon as it is spawned. /// /// The program should be able to handle ` execute-worker ` invocation. pub async fn spawn( program_path: &Path, + executor_params: ExecutorParams, spawn_timeout: Duration, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { - spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout).await + let (mut idle_worker, worker_handle) = + spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout) + .await?; + send_handshake(&mut idle_worker.stream, Handshake { executor_params }) + .await + .map_err(|error| { + gum::warn!( + target: LOG_TARGET, + worker_pid = %idle_worker.pid, + ?error, + "failed to send a handshake to the spawned worker", + ); + SpawnErr::Handshake + })?; + Ok((idle_worker, worker_handle)) } /// Outcome of PVF execution. @@ -159,6 +177,21 @@ pub async fn start_work( } } +async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> { + framed_send(stream, &handshake.encode()).await +} + +async fn recv_handshake(stream: &mut UnixStream) -> io::Result { + let handshake_enc = framed_recv(stream).await?; + let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_handshake: failed to decode Handshake".to_owned(), + ) + })?; + Ok(handshake) +} + async fn send_request( stream: &mut UnixStream, artifact_path: &Path, @@ -203,6 +236,11 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result { }) } +#[derive(Encode, Decode)] +struct Handshake { + executor_params: ExecutorParams, +} + #[derive(Encode, Decode)] pub enum Response { Ok { result_descriptor: ValidationResult, duration: Duration }, @@ -225,7 +263,9 @@ impl Response { /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move { - let executor = Arc::new(Executor::new().map_err(|e| { + let handshake = recv_handshake(&mut stream).await?; + + let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) })?); diff --git a/node/core/pvf/src/executor_intf.rs b/node/core/pvf/src/executor_intf.rs index c5578f5f81ad..0cc533f4a85e 100644 --- a/node/core/pvf/src/executor_intf.rs +++ b/node/core/pvf/src/executor_intf.rs @@ -16,6 +16,7 @@ //! Interface to the Substrate Executor +use polkadot_primitives::vstaging::executor_params::{ExecutorParam, ExecutorParams}; use sc_executor_common::{ runtime_blob::RuntimeBlob, wasm_runtime::{InvokeMethod, WasmModule as _}, @@ -46,7 +47,11 @@ const EXTRA_HEAP_PAGES: u64 = 2048; /// The number of bytes devoted for the stack during wasm execution of a PVF. const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024; -const CONFIG: Config = Config { +// VALUES OF THE DEFAULT CONFIGURATION SHOULD NEVER BE CHANGED +// They are used as base values for the execution environment parametrization. +// To overwrite them, add new ones to `EXECUTOR_PARAMS` in the `session_info` pallet and perform +// a runtime upgrade to make them active. +const DEFAULT_CONFIG: Config = Config { allow_missing_func_imports: true, cache_path: None, semantics: Semantics { @@ -97,17 +102,42 @@ pub fn prevalidate(code: &[u8]) -> Result Result, sc_executor_common::error::WasmError> { - sc_executor_wasmtime::prepare_runtime_artifact(blob, &CONFIG.semantics) +pub fn prepare( + blob: RuntimeBlob, + executor_params: ExecutorParams, +) -> Result, sc_executor_common::error::WasmError> { + let semantics = params_to_wasmtime_semantics(executor_params) + .map_err(|e| sc_executor_common::error::WasmError::Other(e))?; + sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics) +} + +fn params_to_wasmtime_semantics(par: ExecutorParams) -> Result { + let mut sem = DEFAULT_CONFIG.semantics.clone(); + let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() { + stack_limit + } else { + return Err("No default stack limit set".to_owned()) + }; + for p in par.iter() { + match p { + ExecutorParam::MaxMemorySize(mms) => sem.max_memory_size = Some(*mms as usize), + ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm, + ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm, + ExecutorParam::PrecheckingMaxMemory(_) => (), // TODO: Not implemented yet + } + } + sem.deterministic_stack_limit = Some(stack_limit); + Ok(sem) } pub struct Executor { thread_pool: rayon::ThreadPool, spawner: TaskSpawner, + config: Config, } impl Executor { - pub fn new() -> Result { + pub fn new(params: ExecutorParams) -> Result { // Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code. // That native code does not create any stacks and just reuses the stack of the thread that // wasmtime was invoked from. @@ -154,7 +184,10 @@ impl Executor { let spawner = TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?; - Ok(Self { thread_pool, spawner }) + let mut config = DEFAULT_CONFIG.clone(); + config.semantics = params_to_wasmtime_semantics(params)?; + + Ok(Self { thread_pool, spawner, config }) } /// Executes the given PVF in the form of a compiled artifact and returns the result of execution @@ -183,7 +216,7 @@ impl Executor { s.spawn(move |_| { // spawn does not return a value, so we need to use a variable to pass the result. *result = Some( - do_execute(compiled_artifact_path, params, spawner) + do_execute(compiled_artifact_path, self.config.clone(), params, spawner) .map_err(|err| format!("execute error: {:?}", err)), ); }); @@ -195,6 +228,7 @@ impl Executor { unsafe fn do_execute( compiled_artifact_path: &Path, + config: Config, params: &[u8], spawner: impl sp_core::traits::SpawnNamed + 'static, ) -> Result, sc_executor_common::error::Error> { @@ -208,7 +242,7 @@ unsafe fn do_execute( sc_executor::with_externalities_safe(&mut ext, || { let runtime = sc_executor_wasmtime::create_runtime_from_artifact::( compiled_artifact_path, - CONFIG, + config, )?; runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params) })? diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 0ee0b1442fda..b6f515b09d8d 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -25,7 +25,7 @@ use crate::{ error::PrepareError, execute, metrics::Metrics, - prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, + prepare, PrepareResult, Priority, PvfWithExecutorParams, ValidationError, LOG_TARGET, }; use always_assert::never; use futures::{ @@ -33,6 +33,7 @@ use futures::{ Future, FutureExt, SinkExt, StreamExt, }; use polkadot_parachain::primitives::ValidationResult; +use polkadot_primitives::vstaging::ExecutorParams; use std::{ collections::HashMap, path::{Path, PathBuf}, @@ -83,11 +84,11 @@ impl ValidationHost { /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. pub async fn precheck_pvf( &mut self, - pvf: Pvf, + pvf_with_params: PvfWithExecutorParams, result_tx: PrepareResultSender, ) -> Result<(), String> { self.to_host_tx - .send(ToHost::PrecheckPvf { pvf, result_tx }) + .send(ToHost::PrecheckPvf { pvf_with_params, result_tx }) .await .map_err(|_| "the inner loop hung up".to_string()) } @@ -101,7 +102,7 @@ impl ValidationHost { /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. pub async fn execute_pvf( &mut self, - pvf: Pvf, + pvf_with_params: PvfWithExecutorParams, execution_timeout: Duration, params: Vec, priority: Priority, @@ -109,7 +110,7 @@ impl ValidationHost { ) -> Result<(), String> { self.to_host_tx .send(ToHost::ExecutePvf(ExecutePvfInputs { - pvf, + pvf_with_params, execution_timeout, params, priority, @@ -125,7 +126,10 @@ impl ValidationHost { /// situations this function should return immediately. /// /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. - pub async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> { + pub async fn heads_up( + &mut self, + active_pvfs: Vec, + ) -> Result<(), String> { self.to_host_tx .send(ToHost::HeadsUp { active_pvfs }) .await @@ -134,13 +138,13 @@ impl ValidationHost { } enum ToHost { - PrecheckPvf { pvf: Pvf, result_tx: PrepareResultSender }, + PrecheckPvf { pvf_with_params: PvfWithExecutorParams, result_tx: PrepareResultSender }, ExecutePvf(ExecutePvfInputs), - HeadsUp { active_pvfs: Vec }, + HeadsUp { active_pvfs: Vec }, } struct ExecutePvfInputs { - pvf: Pvf, + pvf_with_params: PvfWithExecutorParams, execution_timeout: Duration, params: Vec, priority: Priority, @@ -265,6 +269,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future, + executor_params: ExecutorParams, result_tx: ResultSender, } @@ -279,11 +284,13 @@ impl AwaitingPrepare { artifact_id: ArtifactId, execution_timeout: Duration, params: Vec, + executor_params: ExecutorParams, result_tx: ResultSender, ) { self.0.entry(artifact_id).or_default().push(PendingExecutionRequest { execution_timeout, params, + executor_params, result_tx, }); } @@ -420,8 +427,8 @@ async fn handle_to_host( to_host: ToHost, ) -> Result<(), Fatal> { match to_host { - ToHost::PrecheckPvf { pvf, result_tx } => { - handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?; + ToHost::PrecheckPvf { pvf_with_params, result_tx } => { + handle_precheck_pvf(artifacts, prepare_queue, pvf_with_params, result_tx).await?; }, ToHost::ExecutePvf(inputs) => { handle_execute_pvf( @@ -449,10 +456,10 @@ async fn handle_to_host( async fn handle_precheck_pvf( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, - pvf: Pvf, + pvf_with_params: PvfWithExecutorParams, result_sender: PrepareResultSender, ) -> Result<(), Fatal> { - let artifact_id = pvf.as_artifact_id(); + let artifact_id = pvf_with_params.as_artifact_id(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { @@ -474,7 +481,7 @@ async fn handle_precheck_pvf( prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, - pvf, + pvf_with_params, preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, }, ) @@ -500,8 +507,9 @@ async fn handle_execute_pvf( awaiting_prepare: &mut AwaitingPrepare, inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { - let ExecutePvfInputs { pvf, execution_timeout, params, priority, result_tx } = inputs; - let artifact_id = pvf.as_artifact_id(); + let ExecutePvfInputs { pvf_with_params, execution_timeout, params, priority, result_tx } = + inputs; + let artifact_id = pvf_with_params.as_artifact_id(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { @@ -515,19 +523,26 @@ async fn handle_execute_pvf( artifact: ArtifactPathId::new(artifact_id, cache_path), execution_timeout, params, + executor_params: pvf_with_params.executor_params(), result_tx, }, ) .await?; }, ArtifactState::Preparing { .. } => { - awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); + awaiting_prepare.add( + artifact_id, + execution_timeout, + params, + pvf_with_params.executor_params(), + result_tx, + ); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { gum::warn!( target: LOG_TARGET, - ?pvf, + ?pvf_with_params, ?artifact_id, ?last_time_failed, %num_failures, @@ -541,11 +556,12 @@ async fn handle_execute_pvf( waiting_for_response: Vec::new(), num_failures: *num_failures, }; + let executor_params = pvf_with_params.executor_params().clone(); send_prepare( prepare_queue, prepare::ToQueue::Enqueue { priority, - pvf, + pvf_with_params, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, }, ) @@ -553,7 +569,13 @@ async fn handle_execute_pvf( // Add an execution request that will wait to run after this prepare job has // finished. - awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); + awaiting_prepare.add( + artifact_id, + execution_timeout, + params, + executor_params, + result_tx, + ); } else { let _ = result_tx.send(Err(ValidationError::from(error.clone()))); } @@ -562,19 +584,20 @@ async fn handle_execute_pvf( } else { // Artifact is unknown: register it and enqueue a job with the corresponding priority and // PVF. + let executor_params = pvf_with_params.executor_params(); artifacts.insert_preparing(artifact_id.clone(), Vec::new()); send_prepare( prepare_queue, prepare::ToQueue::Enqueue { priority, - pvf, + pvf_with_params, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, }, ) .await?; // Add an execution request that will wait to run after this prepare job has finished. - awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); + awaiting_prepare.add(artifact_id, execution_timeout, params, executor_params, result_tx); } Ok(()) @@ -583,7 +606,7 @@ async fn handle_execute_pvf( async fn handle_heads_up( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, - active_pvfs: Vec, + active_pvfs: Vec, ) -> Result<(), Fatal> { let now = SystemTime::now(); @@ -619,7 +642,7 @@ async fn handle_heads_up( prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, - pvf: active_pvf, + pvf_with_params: active_pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, }, ) @@ -635,7 +658,7 @@ async fn handle_heads_up( prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, - pvf: active_pvf, + pvf_with_params: active_pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, }, ) @@ -699,7 +722,9 @@ async fn handle_prepare_done( // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); - for PendingExecutionRequest { execution_timeout, params, result_tx } in pending_requests { + for PendingExecutionRequest { execution_timeout, params, executor_params, result_tx } in + pending_requests + { if result_tx.is_canceled() { // Preparation could've taken quite a bit of time and the requester may be not interested // in execution anymore, in which case we just skip the request. @@ -718,6 +743,7 @@ async fn handle_prepare_done( artifact: ArtifactPathId::new(artifact_id.clone(), cache_path), execution_timeout, params, + executor_params, result_tx, }, ) @@ -856,7 +882,7 @@ mod tests { /// Creates a new PVF which artifact id can be uniquely identified by the given number. fn artifact_id(descriminator: u32) -> ArtifactId { - Pvf::from_discriminator(descriminator).as_artifact_id() + PvfWithExecutorParams::from_discriminator(descriminator).as_artifact_id() } fn artifact_path(descriminator: u32) -> PathBuf { @@ -1065,7 +1091,7 @@ mod tests { let mut test = builder.build(); let mut host = test.host_handle(); - host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); let to_sweeper_rx = &mut test.to_sweeper_rx; run_until( @@ -1079,7 +1105,7 @@ mod tests { // Extend TTL for the first artifact and make sure we don't receive another file removal // request. - host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); test.poll_ensure_to_sweeper_is_empty().await; } @@ -1090,7 +1116,7 @@ mod tests { let (result_tx, result_rx_pvf_1_1) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Normal, @@ -1101,7 +1127,7 @@ mod tests { let (result_tx, result_rx_pvf_1_2) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Critical, @@ -1112,7 +1138,7 @@ mod tests { let (result_tx, result_rx_pvf_2) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(2), + PvfWithExecutorParams::from_discriminator(2), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Normal, @@ -1190,7 +1216,9 @@ mod tests { // First, test a simple precheck request. let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) + .await + .unwrap(); // The queue received the prepare request. assert_matches!( @@ -1214,7 +1242,9 @@ mod tests { let mut precheck_receivers = Vec::new(); for _ in 0..3 { let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap(); + host.precheck_pvf(PvfWithExecutorParams::from_discriminator(2), result_tx) + .await + .unwrap(); precheck_receivers.push(result_rx); } // Received prepare request. @@ -1249,7 +1279,7 @@ mod tests { // Send PVF for the execution and request the prechecking for it. let (result_tx, result_rx_execute) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Critical, @@ -1264,7 +1294,9 @@ mod tests { ); let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) + .await + .unwrap(); // Suppose the preparation failed, the execution queue is empty and both // "clients" receive their results. @@ -1286,13 +1318,15 @@ mod tests { let mut precheck_receivers = Vec::new(); for _ in 0..3 { let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap(); + host.precheck_pvf(PvfWithExecutorParams::from_discriminator(2), result_tx) + .await + .unwrap(); precheck_receivers.push(result_rx); } let (result_tx, _result_rx_execute) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(2), + PvfWithExecutorParams::from_discriminator(2), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Critical, @@ -1332,7 +1366,9 @@ mod tests { // Submit a precheck request that fails. let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) + .await + .unwrap(); // The queue received the prepare request. assert_matches!( @@ -1354,7 +1390,9 @@ mod tests { // Submit another precheck request. let (result_tx_2, result_rx_2) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap(); + host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx_2) + .await + .unwrap(); // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; @@ -1368,7 +1406,9 @@ mod tests { // Submit another precheck request. let (result_tx_3, result_rx_3) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap(); + host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx_3) + .await + .unwrap(); // Assert the prepare queue is empty - we do not retry for precheck requests. test.poll_ensure_to_prepare_queue_is_empty().await; @@ -1388,7 +1428,7 @@ mod tests { // Submit a execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1418,7 +1458,7 @@ mod tests { // Submit another execute request. We shouldn't try to prepare again, yet. let (result_tx_2, result_rx_2) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1440,7 +1480,7 @@ mod tests { // Submit another execute request. let (result_tx_3, result_rx_3) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1490,7 +1530,7 @@ mod tests { // Submit an execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1523,7 +1563,7 @@ mod tests { // Submit another execute request. let (result_tx_2, result_rx_2) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1548,7 +1588,7 @@ mod tests { // Submit another execute request. let (result_tx_3, result_rx_3) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1575,7 +1615,7 @@ mod tests { let mut host = test.host_handle(); // Submit a heads-up request that fails. - host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); // The queue received the prepare request. assert_matches!( @@ -1592,7 +1632,7 @@ mod tests { .unwrap(); // Submit another heads-up request. - host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; @@ -1601,7 +1641,7 @@ mod tests { futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another heads-up request. - host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); // Assert the prepare queue contains the request. assert_matches!( @@ -1617,7 +1657,7 @@ mod tests { let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfWithExecutorParams::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Normal, diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index 04c7d5323b30..3de5495a2eec 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -110,7 +110,7 @@ pub use sp_tracing; pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; pub use prepare::PrepareStats; pub use priority::Priority; -pub use pvf::Pvf; +pub use pvf::{Pvf, PvfWithExecutorParams}; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 49670e4c1ac2..1c4f399f6ebf 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -25,6 +25,7 @@ use always_assert::never; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, }; +use polkadot_primitives::vstaging::ExecutorParams; use slotmap::HopSlotMap; use std::{ fmt, @@ -69,6 +70,7 @@ pub enum ToPool { worker: Worker, code: Arc>, artifact_path: PathBuf, + executor_params: ExecutorParams, preparation_timeout: Duration, }, } @@ -214,7 +216,7 @@ fn handle_to_pool( metrics.prepare_worker().on_begin_spawn(); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); }, - ToPool::StartWork { worker, code, artifact_path, preparation_timeout } => { + ToPool::StartWork { worker, code, artifact_path, executor_params, preparation_timeout } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { let preparation_timer = metrics.time_preparation(); @@ -226,6 +228,7 @@ fn handle_to_pool( code, cache_path.to_owned(), artifact_path, + executor_params, preparation_timeout, preparation_timer, ) @@ -275,12 +278,20 @@ async fn start_work_task( code: Arc>, cache_path: PathBuf, artifact_path: PathBuf, + executor_params: ExecutorParams, preparation_timeout: Duration, _preparation_timer: Option, ) -> PoolEvent { - let outcome = - worker::start_work(&metrics, idle, code, &cache_path, artifact_path, preparation_timeout) - .await; + let outcome = worker::start_work( + &metrics, + idle, + code, + &cache_path, + artifact_path, + executor_params, + preparation_timeout, + ) + .await; PoolEvent::StartWork(worker, outcome) } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index d8dd90688c4f..939f42ea62bf 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -17,7 +17,10 @@ //! A queue that handles requests for PVF preparation. use super::pool::{self, Worker}; -use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; +use crate::{ + artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, PvfWithExecutorParams, + LOG_TARGET, +}; use always_assert::{always, never}; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; use std::{ @@ -33,7 +36,11 @@ pub enum ToQueue { /// /// Note that it is incorrect to enqueue the same PVF again without first receiving the /// [`FromQueue`] response. - Enqueue { priority: Priority, pvf: Pvf, preparation_timeout: Duration }, + Enqueue { + priority: Priority, + pvf_with_params: PvfWithExecutorParams, + preparation_timeout: Duration, + }, } /// A response from queue. @@ -78,7 +85,7 @@ slotmap::new_key_type! { pub struct Job; } struct JobData { /// The priority of this job. Can be bumped. priority: Priority, - pvf: Pvf, + pvf_with_params: PvfWithExecutorParams, /// The timeout for the preparation job. preparation_timeout: Duration, worker: Option, @@ -208,8 +215,8 @@ impl Queue { async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> { match to_queue { - ToQueue::Enqueue { priority, pvf, preparation_timeout } => { - handle_enqueue(queue, priority, pvf, preparation_timeout).await?; + ToQueue::Enqueue { priority, pvf_with_params, preparation_timeout } => { + handle_enqueue(queue, priority, pvf_with_params, preparation_timeout).await?; }, } Ok(()) @@ -218,19 +225,19 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat async fn handle_enqueue( queue: &mut Queue, priority: Priority, - pvf: Pvf, + pvf_with_params: PvfWithExecutorParams, preparation_timeout: Duration, ) -> Result<(), Fatal> { gum::debug!( target: LOG_TARGET, - validation_code_hash = ?pvf.code_hash, + validation_code_hash = ?pvf_with_params.code_hash(), ?priority, ?preparation_timeout, "PVF is enqueued for preparation.", ); queue.metrics.prepare_enqueued(); - let artifact_id = pvf.as_artifact_id(); + let artifact_id = pvf_with_params.as_artifact_id(); if never!( queue.artifact_id_to_job.contains_key(&artifact_id), "second Enqueue sent for a known artifact" @@ -247,7 +254,10 @@ async fn handle_enqueue( return Ok(()) } - let job = queue.jobs.insert(JobData { priority, pvf, preparation_timeout, worker: None }); + let job = + queue + .jobs + .insert(JobData { priority, pvf_with_params, preparation_timeout, worker: None }); queue.artifact_id_to_job.insert(artifact_id, job); if let Some(available) = find_idle_worker(queue) { @@ -338,7 +348,7 @@ async fn handle_worker_concluded( // this can't be None; // qed. let job_data = never_none!(queue.jobs.remove(job)); - let artifact_id = job_data.pvf.as_artifact_id(); + let artifact_id = job_data.pvf_with_params.as_artifact_id(); queue.artifact_id_to_job.remove(&artifact_id); @@ -424,7 +434,7 @@ async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fat async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> { let job_data = &mut queue.jobs[job]; - let artifact_id = job_data.pvf.as_artifact_id(); + let artifact_id = job_data.pvf_with_params.as_artifact_id(); let artifact_path = artifact_id.path(&queue.cache_path); job_data.worker = Some(worker); @@ -435,8 +445,9 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal &mut queue.to_pool_tx, pool::ToPool::StartWork { worker, - code: job_data.pvf.code.clone(), + code: job_data.pvf_with_params.code(), artifact_path, + executor_params: job_data.pvf_with_params.executor_params(), preparation_timeout: job_data.preparation_timeout, }, ) @@ -503,8 +514,8 @@ mod tests { use std::task::Poll; /// Creates a new PVF which artifact id can be uniquely identified by the given number. - fn pvf(descriminator: u32) -> Pvf { - Pvf::from_discriminator(descriminator) + fn pvf_with_params(descriminator: u32) -> PvfWithExecutorParams { + PvfWithExecutorParams::from_discriminator(descriminator) } async fn run_until( @@ -613,7 +624,7 @@ mod tests { test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, - pvf: pvf(1), + pvf_with_params: pvf_with_params(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -626,7 +637,10 @@ mod tests { result: Ok(PrepareStats::default()), }); - assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); + assert_eq!( + test.poll_and_recv_from_queue().await.artifact_id, + pvf_with_params(1).as_artifact_id() + ); } #[tokio::test] @@ -635,12 +649,20 @@ mod tests { let priority = Priority::Normal; let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); + test.send_queue(ToQueue::Enqueue { + priority, + pvf_with_params: PvfWithExecutorParams::from_discriminator(1), + preparation_timeout, + }); + test.send_queue(ToQueue::Enqueue { + priority, + pvf_with_params: PvfWithExecutorParams::from_discriminator(2), + preparation_timeout, + }); // Start a non-precheck preparation for this one. test.send_queue(ToQueue::Enqueue { priority, - pvf: pvf(3), + pvf_with_params: PvfWithExecutorParams::from_discriminator(3), preparation_timeout: LENIENT_PREPARATION_TIMEOUT, }); @@ -669,7 +691,7 @@ mod tests { // Enqueue a critical job. test.send_queue(ToQueue::Enqueue { priority: Priority::Critical, - pvf: pvf(4), + pvf_with_params: PvfWithExecutorParams::from_discriminator(4), preparation_timeout, }); @@ -685,7 +707,7 @@ mod tests { test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, - pvf: pvf(1), + pvf_with_params: PvfWithExecutorParams::from_discriminator(1), preparation_timeout, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -696,7 +718,7 @@ mod tests { // Enqueue a critical job, which warrants spawning over the soft limit. test.send_queue(ToQueue::Enqueue { priority: Priority::Critical, - pvf: pvf(2), + pvf_with_params: PvfWithExecutorParams::from_discriminator(2), preparation_timeout, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -722,12 +744,20 @@ mod tests { let priority = Priority::Normal; let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); + test.send_queue(ToQueue::Enqueue { + priority, + pvf_with_params: PvfWithExecutorParams::from_discriminator(1), + preparation_timeout, + }); + test.send_queue(ToQueue::Enqueue { + priority, + pvf_with_params: PvfWithExecutorParams::from_discriminator(2), + preparation_timeout, + }); // Start a non-precheck preparation for this one. test.send_queue(ToQueue::Enqueue { priority, - pvf: pvf(3), + pvf_with_params: PvfWithExecutorParams::from_discriminator(3), preparation_timeout: LENIENT_PREPARATION_TIMEOUT, }); @@ -753,7 +783,10 @@ mod tests { // Since there is still work, the queue requested one extra worker to spawn to handle the // remaining enqueued work items. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); - assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); + assert_eq!( + test.poll_and_recv_from_queue().await.artifact_id, + pvf_with_params(1).as_artifact_id() + ); } #[tokio::test] @@ -762,7 +795,7 @@ mod tests { test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, - pvf: pvf(1), + pvf_with_params: PvfWithExecutorParams::from_discriminator(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, }); @@ -787,7 +820,7 @@ mod tests { test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, - pvf: pvf(1), + pvf_with_params: PvfWithExecutorParams::from_discriminator(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, }); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 3e64777a9c17..8fba877a9377 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -34,6 +34,7 @@ use crate::{ use cpu_time::ProcessTime; use futures::{pin_mut, select_biased, FutureExt}; use parity_scale_codec::{Decode, Encode}; +use polkadot_primitives::vstaging::ExecutorParams; use sp_core::hexdisplay::HexDisplay; use std::{ panic, @@ -85,6 +86,7 @@ pub async fn start_work( code: Arc>, cache_path: &Path, artifact_path: PathBuf, + executor_params: ExecutorParams, preparation_timeout: Duration, ) -> Outcome { let IdleWorker { stream, pid } = worker; @@ -97,7 +99,9 @@ pub async fn start_work( ); with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { - if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { + if let Err(err) = + send_request(&mut stream, code, &tmp_file, &executor_params, preparation_timeout).await + { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -271,15 +275,19 @@ async fn send_request( stream: &mut UnixStream, code: Arc>, tmp_file: &Path, + executor_params: &ExecutorParams, preparation_timeout: Duration, ) -> io::Result<()> { framed_send(stream, &code).await?; framed_send(stream, path_to_bytes(tmp_file)).await?; + framed_send(stream, &executor_params.encode()).await?; framed_send(stream, &preparation_timeout.encode()).await?; Ok(()) } -async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, Duration)> { +async fn recv_request( + stream: &mut UnixStream, +) -> io::Result<(Vec, PathBuf, ExecutorParams, Duration)> { let code = framed_recv(stream).await?; let tmp_file = framed_recv(stream).await?; let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { @@ -288,6 +296,13 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, "prepare pvf recv_request: non utf-8 artifact path".to_string(), ) })?; + let executor_params_enc = framed_recv(stream).await?; + let executor_params = ExecutorParams::decode(&mut &executor_params_enc[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "prepare pvf recv_request: failed to decode ExecutorParams".to_string(), + ) + })?; let preparation_timeout = framed_recv(stream).await?; let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| { io::Error::new( @@ -295,7 +310,7 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, format!("prepare pvf recv_request: failed to decode duration: {:?}", e), ) })?; - Ok((code, tmp_file, preparation_timeout)) + Ok((code, tmp_file, executor_params, preparation_timeout)) } async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { @@ -347,7 +362,8 @@ pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { let worker_pid = std::process::id(); - let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; + let (code, dest, executor_params, preparation_timeout) = + recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, %worker_pid, @@ -372,7 +388,7 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn another thread for preparation. let prepare_fut = rt_handle .spawn_blocking(move || { - let result = prepare_artifact(&code); + let result = prepare_artifact(&code, executor_params); // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. #[cfg(target_os = "linux")] @@ -454,14 +470,17 @@ pub fn worker_entrypoint(socket_path: &str) { }); } -fn prepare_artifact(code: &[u8]) -> Result { +fn prepare_artifact( + code: &[u8], + executor_params: ExecutorParams, +) -> Result { panic::catch_unwind(|| { let blob = match crate::executor_intf::prevalidate(code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; - match crate::executor_intf::prepare(blob) { + match crate::executor_intf::prepare(blob, executor_params) { Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } diff --git a/node/core/pvf/src/pvf.rs b/node/core/pvf/src/pvf.rs index d06968a13d43..e0284a26085f 100644 --- a/node/core/pvf/src/pvf.rs +++ b/node/core/pvf/src/pvf.rs @@ -16,6 +16,7 @@ use crate::artifacts::ArtifactId; use polkadot_parachain::primitives::ValidationCodeHash; +use polkadot_primitives::vstaging::ExecutorParams; use sp_core::blake2_256; use std::{fmt, sync::Arc}; @@ -48,9 +49,47 @@ impl Pvf { let descriminator_buf = num.to_le_bytes().to_vec(); Pvf::from_code(descriminator_buf) } +} + +/// Coupling PVF code with executor params +#[derive(Debug, Clone)] +pub struct PvfWithExecutorParams { + pvf: Pvf, + executor_params: Arc, +} + +impl PvfWithExecutorParams { + /// Creates a new PVF-ExecutorParams pair structure + pub fn new(pvf: Pvf, executor_params: ExecutorParams) -> Self { + Self { pvf, executor_params: Arc::new(executor_params) } + } - /// Returns the artifact ID that corresponds to this PVF. + /// Returns artifact ID that corresponds to the PVF with given executor params pub(crate) fn as_artifact_id(&self) -> ArtifactId { - ArtifactId::new(self.code_hash) + ArtifactId::new(self.pvf.code_hash, self.executor_params.hash()) + } + + /// Returns validation code hash for the PVF + pub(crate) fn code_hash(&self) -> ValidationCodeHash { + self.pvf.code_hash + } + + /// Returns PVF code + pub(crate) fn code(&self) -> Arc> { + self.pvf.code.clone() + } + + /// Returns executor params + pub(crate) fn executor_params(&self) -> ExecutorParams { + (*self.executor_params).clone() + } + + /// Creates a structure for tests + #[cfg(test)] + pub(crate) fn from_discriminator(num: u32) -> Self { + Self { + pvf: Pvf::from_discriminator(num), + executor_params: Arc::new(ExecutorParams::default()), + } } } diff --git a/node/core/pvf/src/testing.rs b/node/core/pvf/src/testing.rs index cbd37b06d403..2abc1d07a836 100644 --- a/node/core/pvf/src/testing.rs +++ b/node/core/pvf/src/testing.rs @@ -19,6 +19,8 @@ //! N.B. This is not guarded with some feature flag. Overexposing items here may affect the final //! artifact even for production builds. +use polkadot_primitives::vstaging::ExecutorParams; + pub mod worker_common { pub use crate::worker_common::{spawn_with_program_path, SpawnErr}; } @@ -35,12 +37,12 @@ pub fn validate_candidate( .expect("Decompressing code failed"); let blob = prevalidate(&code)?; - let artifact = prepare(blob)?; + let artifact = prepare(blob, ExecutorParams::default())?; let tmpdir = tempfile::tempdir()?; let artifact_path = tmpdir.path().join("blob"); std::fs::write(&artifact_path, &artifact)?; - let executor = Executor::new()?; + let executor = Executor::new(ExecutorParams::default())?; let result = unsafe { // SAFETY: This is trivially safe since the artifact is obtained by calling `prepare` // and is written into a temporary directory in an unmodified state. diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 9cda5f8cd0b7..430a6950fb4f 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -251,6 +251,8 @@ pub enum SpawnErr { ProcessSpawn, /// The deadline allotted for the worker spawning and connecting to the socket has elapsed. AcceptTimeout, + /// Failed to send handshake after successful spawning was signaled + Handshake, } /// This is a representation of a potentially running worker. Drop it and the process will be killed. diff --git a/node/core/pvf/tests/it/adder.rs b/node/core/pvf/tests/it/adder.rs index 8eb57e4d9026..3c373f7ea517 100644 --- a/node/core/pvf/tests/it/adder.rs +++ b/node/core/pvf/tests/it/adder.rs @@ -39,6 +39,7 @@ async fn execute_good_block_on_parent() { relay_parent_number: 1, relay_parent_storage_root: Default::default(), }, + Default::default(), ) .await .unwrap(); @@ -72,6 +73,7 @@ async fn execute_good_chain_on_parent() { relay_parent_number: number as RelayChainBlockNumber + 1, relay_parent_storage_root: Default::default(), }, + Default::default(), ) .await .unwrap(); @@ -108,6 +110,7 @@ async fn execute_bad_block_on_parent() { relay_parent_number: 1, relay_parent_storage_root: Default::default(), }, + Default::default(), ) .await .unwrap_err(); @@ -129,6 +132,7 @@ async fn stress_spawn() { relay_parent_number: 1, relay_parent_storage_root: Default::default(), }, + Default::default(), ) .await .unwrap(); diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index 07754ef8693d..b540230c4746 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -17,10 +17,11 @@ use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ - start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, - JOB_TIMEOUT_WALL_CLOCK_FACTOR, + start, Config, InvalidCandidate, Metrics, Pvf, PvfWithExecutorParams, ValidationError, + ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; +use polkadot_primitives::vstaging::{ExecutorParam, ExecutorParams}; use std::time::Duration; use tokio::sync::Mutex; @@ -57,6 +58,7 @@ impl TestHost { &self, code: &[u8], params: ValidationParams, + executor_params: ExecutorParams, ) -> Result { let (result_tx, result_rx) = futures::channel::oneshot::channel(); @@ -67,7 +69,7 @@ impl TestHost { .lock() .await .execute_pvf( - Pvf::from_code(code.into()), + PvfWithExecutorParams::new(Pvf::from_code(code.into()), executor_params), TEST_EXECUTION_TIMEOUT, params.encode(), polkadot_node_core_pvf::Priority::Normal, @@ -93,6 +95,7 @@ async fn terminates_on_timeout() { relay_parent_number: 1, relay_parent_storage_root: Default::default(), }, + Default::default(), ) .await; @@ -118,6 +121,7 @@ async fn ensure_parallel_execution() { relay_parent_number: 1, relay_parent_storage_root: Default::default(), }, + Default::default(), ); let execute_pvf_future_2 = host.validate_candidate( halt::wasm_binary_unwrap(), @@ -127,6 +131,7 @@ async fn ensure_parallel_execution() { relay_parent_number: 1, relay_parent_storage_root: Default::default(), }, + Default::default(), ); let start = std::time::Instant::now(); @@ -169,6 +174,7 @@ async fn execute_queue_doesnt_stall_if_workers_died() { relay_parent_number: 1, relay_parent_storage_root: Default::default(), }, + Default::default(), ) })) .await; @@ -184,3 +190,52 @@ async fn execute_queue_doesnt_stall_if_workers_died() { max_duration.as_millis() ); } + +#[tokio::test] +async fn execute_queue_doesnt_stall_with_varying_executor_params() { + let host = TestHost::new_with_config(|cfg| { + cfg.execute_workers_max_num = 2; + }); + + let executor_params_1 = ExecutorParams::default(); + let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]); + + // Here we spawn 6 validation jobs for the `halt` PVF and share those between 2 workers. Every + // 3rd job will have different set of executor parameters. All the workers should be killed + // and in this case the queue should respawn new workers with needed executor environment + // without waiting. The jobs will be executed in 3 batches, each running two jobs in parallel, + // and execution time would be roughly 3 * TEST_EXECUTION_TIMEOUT + let start = std::time::Instant::now(); + futures::future::join_all((0u8..6).map(|i| { + host.validate_candidate( + halt::wasm_binary_unwrap(), + ValidationParams { + block_data: BlockData(Vec::new()), + parent_head: Default::default(), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + match i % 3 { + 0 => executor_params_1.clone(), + _ => executor_params_2.clone(), + }, + ) + })) + .await; + + let duration = std::time::Instant::now().duration_since(start); + let min_duration = 3 * TEST_EXECUTION_TIMEOUT; + let max_duration = 4 * TEST_EXECUTION_TIMEOUT; + assert!( + duration >= min_duration, + "Expected duration {}ms to be greater than or equal to {}ms", + duration.as_millis(), + min_duration.as_millis() + ); + assert!( + duration <= max_duration, + "Expected duration {}ms to be less than or equal to {}ms", + duration.as_millis(), + max_duration.as_millis() + ); +} diff --git a/node/core/runtime-api/src/cache.rs b/node/core/runtime-api/src/cache.rs index 9efc31328692..82d2e0dbc8b3 100644 --- a/node/core/runtime-api/src/cache.rs +++ b/node/core/runtime-api/src/cache.rs @@ -20,11 +20,12 @@ use lru::LruCache; use sp_consensus_babe::Epoch; use polkadot_primitives::{ - AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id as ParaId, - InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, - PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, - ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + vstaging::ExecutorParams, AuthorityDiscoveryId, BlockNumber, CandidateCommitments, + CandidateEvent, CandidateHash, CommittedCandidateReceipt, CoreState, DisputeState, + GroupRotationInfo, Hash, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, + OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, + SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, + ValidatorSignature, }; /// For consistency we have the same capacity for all caches. We use 128 as we'll only need that @@ -51,6 +52,7 @@ pub(crate) struct RequestResultCache { validation_code_by_hash: LruCache>, candidate_pending_availability: LruCache<(Hash, ParaId), Option>, candidate_events: LruCache>, + session_executor_params: LruCache>, session_info: LruCache, dmq_contents: LruCache<(Hash, ParaId), Vec>>, inbound_hrmp_channels_contents: @@ -79,6 +81,7 @@ impl Default for RequestResultCache { validation_code_by_hash: LruCache::new(DEFAULT_CACHE_CAP), candidate_pending_availability: LruCache::new(DEFAULT_CACHE_CAP), candidate_events: LruCache::new(DEFAULT_CACHE_CAP), + session_executor_params: LruCache::new(DEFAULT_CACHE_CAP), session_info: LruCache::new(DEFAULT_CACHE_CAP), dmq_contents: LruCache::new(DEFAULT_CACHE_CAP), inbound_hrmp_channels_contents: LruCache::new(DEFAULT_CACHE_CAP), @@ -263,6 +266,21 @@ impl RequestResultCache { self.session_info.put(key, value); } + pub(crate) fn session_executor_params( + &mut self, + session_index: SessionIndex, + ) -> Option<&Option> { + self.session_executor_params.get(&session_index) + } + + pub(crate) fn cache_session_executor_params( + &mut self, + session_index: SessionIndex, + value: Option, + ) { + self.session_executor_params.put(session_index, value); + } + pub(crate) fn dmq_contents( &mut self, key: (Hash, ParaId), @@ -389,6 +407,7 @@ pub(crate) enum RequestResult { ValidationCodeByHash(Hash, ValidationCodeHash, Option), CandidatePendingAvailability(Hash, ParaId, Option), CandidateEvents(Hash, Vec), + SessionExecutorParams(Hash, SessionIndex, Option), SessionInfo(Hash, SessionIndex, Option), DmqContents(Hash, ParaId, Vec>), InboundHrmpChannelsContents( diff --git a/node/core/runtime-api/src/lib.rs b/node/core/runtime-api/src/lib.rs index 3d016305bc64..0c5641d1201e 100644 --- a/node/core/runtime-api/src/lib.rs +++ b/node/core/runtime-api/src/lib.rs @@ -132,6 +132,8 @@ where .cache_candidate_pending_availability((relay_parent, para_id), candidate), CandidateEvents(relay_parent, events) => self.requests_cache.cache_candidate_events(relay_parent, events), + SessionExecutorParams(_relay_parent, session_index, index) => + self.requests_cache.cache_session_executor_params(session_index, index), SessionInfo(_relay_parent, session_index, info) => if let Some(info) = info { self.requests_cache.cache_session_info(session_index, info); @@ -229,6 +231,17 @@ where .map(|sender| Request::CandidatePendingAvailability(para, sender)), Request::CandidateEvents(sender) => query!(candidate_events(), sender).map(|sender| Request::CandidateEvents(sender)), + Request::SessionExecutorParams(session_index, sender) => { + if let Some(executor_params) = + self.requests_cache.session_executor_params(session_index) + { + self.metrics.on_cached_request(); + let _ = sender.send(Ok(executor_params.clone())); + None + } else { + Some(Request::SessionExecutorParams(session_index, sender)) + } + }, Request::SessionInfo(index, sender) => { if let Some(info) = self.requests_cache.session_info(index) { self.metrics.on_cached_request(); @@ -480,6 +493,12 @@ where res.ok().map(|res| RequestResult::SessionInfo(relay_parent, index, res)) }, + Request::SessionExecutorParams(session_index, sender) => query!( + SessionExecutorParams, + session_executor_params(session_index), + ver = Request::EXECUTOR_PARAMS_RUNTIME_REQUIREMENT, + sender + ), Request::DmqContents(id, sender) => query!(DmqContents, dmq_contents(id), ver = 1, sender), Request::InboundHrmpChannelsContents(id, sender) => query!(InboundHrmpChannelsContents, inbound_hrmp_channels_contents(id), ver = 1, sender), diff --git a/node/malus/src/variants/suggest_garbage_candidate.rs b/node/malus/src/variants/suggest_garbage_candidate.rs index 146348f00123..7e1a9246bc4f 100644 --- a/node/malus/src/variants/suggest_garbage_candidate.rs +++ b/node/malus/src/variants/suggest_garbage_candidate.rs @@ -33,7 +33,7 @@ use polkadot_cli::{ }; use polkadot_node_core_candidate_validation::find_validation_data; use polkadot_node_primitives::{AvailableData, BlockData, PoV}; -use polkadot_primitives::CandidateDescriptor; +use polkadot_primitives::{CandidateDescriptor, CandidateReceipt}; use polkadot_node_subsystem_util::request_validators; use sp_core::traits::SpawnNamed; @@ -53,7 +53,6 @@ use crate::{ // Import extra types relevant to the particular // subsystem. use polkadot_node_subsystem::{messages::CandidateBackingMessage, SpawnGlue}; -use polkadot_primitives::CandidateReceipt; use std::sync::Arc; diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 506e37d2cc92..1acafbd1cfaa 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -39,11 +39,11 @@ use polkadot_node_primitives::{ SignedDisputeStatement, SignedFullStatement, ValidationResult, }; use polkadot_primitives::{ - AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent, CandidateHash, - CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState, - DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader, Id as ParaId, - InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, OccupiedCoreAssumption, - PersistedValidationData, PvfCheckStatement, SessionIndex, SessionInfo, + vstaging::ExecutorParams, AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent, + CandidateHash, CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, + CoreState, DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader, + Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, + OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, SessionIndex, SessionInfo, SignedAvailabilityBitfield, SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, }; @@ -574,6 +574,8 @@ pub enum RuntimeApiRequest { /// Get all events concerning candidates (backing, inclusion, time-out) in the parent of /// the block in whose state this request is executed. CandidateEvents(RuntimeApiSender>), + /// Get the execution environment parameter set by session index + SessionExecutorParams(SessionIndex, RuntimeApiSender>), /// Get the session info for the given session, if stored. SessionInfo(SessionIndex, RuntimeApiSender>), /// Get all the pending inbound messages in the downward message queue for a para. @@ -608,6 +610,9 @@ impl RuntimeApiRequest { /// `Disputes` pub const DISPUTES_RUNTIME_REQUIREMENT: u32 = 3; + + /// `ExecutorParams` + pub const EXECUTOR_PARAMS_RUNTIME_REQUIREMENT: u32 = 4; } /// A message to the Runtime API subsystem. diff --git a/node/subsystem-types/src/runtime_client.rs b/node/subsystem-types/src/runtime_client.rs index 9a55462b8852..03283942b74d 100644 --- a/node/subsystem-types/src/runtime_client.rs +++ b/node/subsystem-types/src/runtime_client.rs @@ -16,11 +16,12 @@ use async_trait::async_trait; use polkadot_primitives::{ - runtime_api::ParachainHost, Block, BlockId, BlockNumber, CandidateCommitments, CandidateEvent, - CandidateHash, CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id, - InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, - PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, - ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + runtime_api::ParachainHost, vstaging::ExecutorParams, Block, BlockId, BlockNumber, + CandidateCommitments, CandidateEvent, CandidateHash, CommittedCandidateReceipt, CoreState, + DisputeState, GroupRotationInfo, Hash, Id, InboundDownwardMessage, InboundHrmpMessage, + OccupiedCoreAssumption, OldV1SessionInfo, PersistedValidationData, PvfCheckStatement, + ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, + ValidatorId, ValidatorIndex, ValidatorSignature, }; use sp_api::{ApiError, ApiExt, ProvideRuntimeApi}; use sp_authority_discovery::AuthorityDiscoveryApi; @@ -154,7 +155,7 @@ pub trait RuntimeApiSubsystemClient { &self, at: Hash, index: SessionIndex, - ) -> Result, ApiError>; + ) -> Result, ApiError>; /// Submits a PVF pre-checking statement into the transaction pool. /// @@ -181,6 +182,8 @@ pub trait RuntimeApiSubsystemClient { assumption: OccupiedCoreAssumption, ) -> Result, ApiError>; + /***** Added in v3 *****/ + /// Returns all onchain disputes. /// This is a staging method! Do not use on production runtimes! async fn disputes( @@ -188,6 +191,13 @@ pub trait RuntimeApiSubsystemClient { at: Hash, ) -> Result)>, ApiError>; + /// Get the execution environment parameter set by parent hash, if stored + async fn session_executor_params( + &self, + at: Hash, + session_index: SessionIndex, + ) -> Result, ApiError>; + // === BABE API === /// Returns information regarding the current epoch. @@ -316,6 +326,14 @@ where self.runtime_api().on_chain_votes(&BlockId::Hash(at)) } + async fn session_executor_params( + &self, + at: Hash, + session_index: SessionIndex, + ) -> Result, ApiError> { + self.runtime_api().session_executor_params(&BlockId::Hash(at), session_index) + } + async fn session_info( &self, at: Hash, @@ -367,7 +385,7 @@ where &self, at: Hash, index: SessionIndex, - ) -> Result, ApiError> { + ) -> Result, ApiError> { #[allow(deprecated)] self.runtime_api().session_info_before_version_2(&BlockId::Hash(at), index) } diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs index ca41ae232461..3aafc14a7878 100644 --- a/node/subsystem-util/src/lib.rs +++ b/node/subsystem-util/src/lib.rs @@ -29,6 +29,7 @@ use polkadot_node_subsystem::{ messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender}, overseer, SubsystemSender, }; +use polkadot_primitives::vstaging::ExecutorParams; pub use overseer::{ gen::{OrchestraError as OverseerError, Timeout}, @@ -115,6 +116,9 @@ pub enum Error { /// Already forwarding errors to another sender #[error("AlreadyForwarding")] AlreadyForwarding, + /// Data that are supposed to be there a not there + #[error("Data are not available")] + DataNotAvailable, } impl From for Error { @@ -209,6 +213,55 @@ specialize_requests! { fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; ValidationCodeHash; fn request_on_chain_votes() -> Option; FetchOnChainVotes; + fn request_session_executor_params(session_index: SessionIndex) -> Option; SessionExecutorParams; +} + +/// Requests executor parameters from the runtime effective at given relay-parent. First obtains +/// session index at the relay-parent, relying on the fact that it should be cached by the runtime +/// API caching layer even if the block itself has already been pruned. Then requests executor +/// parameters by session index. +/// Returns an error if failed to communicate to the runtime, or the parameters are not in the +/// storage, which should never happen. +/// Returns default execution parameters if the runtime doesn't yet support `SessionExecutorParams` +/// API call. +/// Otherwise, returns execution parameters returned by the runtime. +pub async fn executor_params_at_relay_parent( + relay_parent: Hash, + sender: &mut impl overseer::SubsystemSender, +) -> Result { + match request_session_index_for_child(relay_parent, sender).await.await { + Err(err) => { + // Failed to communicate with the runtime + Err(Error::Oneshot(err)) + }, + Ok(Err(err)) => { + // Runtime has failed to obtain a session index at the relay-parent. + Err(Error::RuntimeApi(err)) + }, + Ok(Ok(session_index)) => { + match request_session_executor_params(relay_parent, session_index, sender).await.await { + Err(err) => { + // Failed to communicate with the runtime + Err(Error::Oneshot(err)) + }, + Ok(Err(RuntimeApiError::NotSupported { .. })) => { + // Runtime doesn't yet support the api requested, should execute anyway + // with default set of parameters + Ok(ExecutorParams::default()) + }, + Ok(Err(err)) => { + // Runtime failed to execute the request + Err(Error::RuntimeApi(err)) + }, + Ok(Ok(None)) => { + // Storage doesn't contain a parameter set for the given session; should + // never happen + Err(Error::DataNotAvailable) + }, + Ok(Ok(Some(executor_params))) => Ok(executor_params), + } + }, + } } /// From the given set of validators, find the first key we can sign with, if any. diff --git a/node/test/performance-test/Cargo.toml b/node/test/performance-test/Cargo.toml index 95da89662579..b2b2ef8148f2 100644 --- a/node/test/performance-test/Cargo.toml +++ b/node/test/performance-test/Cargo.toml @@ -13,6 +13,7 @@ log = "0.4" polkadot-node-core-pvf = { path = "../../core/pvf" } polkadot-erasure-coding = { path = "../../../erasure-coding" } polkadot-node-primitives = { path = "../../primitives" } +polkadot-primitives = { path = "../../../primitives" } kusama-runtime = { path = "../../../runtime/kusama" } diff --git a/node/test/performance-test/src/lib.rs b/node/test/performance-test/src/lib.rs index e80b5e7589f2..9d01121b953b 100644 --- a/node/test/performance-test/src/lib.rs +++ b/node/test/performance-test/src/lib.rs @@ -18,6 +18,7 @@ use polkadot_erasure_coding::{obtain_chunks, reconstruct}; use polkadot_node_core_pvf::{sc_executor_common, sp_maybe_compressed_blob}; +use polkadot_primitives::vstaging::ExecutorParams; use std::time::{Duration, Instant}; mod constants; @@ -66,7 +67,8 @@ pub fn measure_pvf_prepare(wasm_code: &[u8]) -> Result // Recreate the pipeline from the pvf prepare worker. let blob = polkadot_node_core_pvf::prevalidate(code.as_ref()).map_err(PerfCheckError::from)?; - polkadot_node_core_pvf::prepare(blob).map_err(PerfCheckError::from)?; + polkadot_node_core_pvf::prepare(blob, ExecutorParams::default()) + .map_err(PerfCheckError::from)?; Ok(start.elapsed()) } diff --git a/primitives/src/runtime_api.rs b/primitives/src/runtime_api.rs index 2f6793d22032..a618d18e22d7 100644 --- a/primitives/src/runtime_api.rs +++ b/primitives/src/runtime_api.rs @@ -110,11 +110,15 @@ //! All staging API functions should use primitives from `vstaging`. They should be clearly separated //! from the stable primitives. -use crate::v2; +use crate::{ + v2, vstaging, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, + CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, OccupiedCoreAssumption, + PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, + ValidatorId, ValidatorIndex, ValidatorSignature, +}; use parity_scale_codec::{Decode, Encode}; use polkadot_core_primitives as pcp; use polkadot_parachain::primitives as ppp; -use sp_staking; use sp_std::{collections::btree_map::BTreeMap, prelude::*}; sp_api::decl_runtime_apis! { @@ -122,24 +126,24 @@ sp_api::decl_runtime_apis! { #[api_version(2)] pub trait ParachainHost { /// Get the current validators. - fn validators() -> Vec; + fn validators() -> Vec; /// Returns the validator groups and rotation info localized based on the hypothetical child /// of a block whose state this is invoked on. Note that `now` in the `GroupRotationInfo` /// should be the successor of the number of the block. - fn validator_groups() -> (Vec>, v2::GroupRotationInfo); + fn validator_groups() -> (Vec>, GroupRotationInfo); /// Yields information on all availability cores as relevant to the child block. /// Cores are either free or occupied. Free cores can have paras assigned to them. - fn availability_cores() -> Vec>; + fn availability_cores() -> Vec>; /// Yields the persisted validation data for the given `ParaId` along with an assumption that /// should be used if the para currently occupies a core. /// /// Returns `None` if either the para is not registered or the assumption is `Freed` /// and the para already occupies a core. - fn persisted_validation_data(para_id: ppp::Id, assumption: v2::OccupiedCoreAssumption) - -> Option>; + fn persisted_validation_data(para_id: ppp::Id, assumption: OccupiedCoreAssumption) + -> Option>; /// Returns the persisted validation data for the given `ParaId` along with the corresponding /// validation code hash. Instead of accepting assumption about the para, matches the validation @@ -147,29 +151,29 @@ sp_api::decl_runtime_apis! { fn assumed_validation_data( para_id: ppp::Id, expected_persisted_validation_data_hash: pcp::v2::Hash, - ) -> Option<(v2::PersistedValidationData, ppp::ValidationCodeHash)>; + ) -> Option<(PersistedValidationData, ppp::ValidationCodeHash)>; /// Checks if the given validation outputs pass the acceptance criteria. - fn check_validation_outputs(para_id: ppp::Id, outputs: v2::CandidateCommitments) -> bool; + fn check_validation_outputs(para_id: ppp::Id, outputs: CandidateCommitments) -> bool; /// Returns the session index expected at a child of the block. /// /// This can be used to instantiate a `SigningContext`. - fn session_index_for_child() -> sp_staking::SessionIndex; + fn session_index_for_child() -> SessionIndex; /// Fetch the validation code used by a para, making the given `OccupiedCoreAssumption`. /// /// Returns `None` if either the para is not registered or the assumption is `Freed` /// and the para already occupies a core. - fn validation_code(para_id: ppp::Id, assumption: v2::OccupiedCoreAssumption) + fn validation_code(para_id: ppp::Id, assumption: OccupiedCoreAssumption) -> Option; /// Get the receipt of a candidate pending availability. This returns `Some` for any paras /// assigned to occupied cores in `availability_cores` and `None` otherwise. - fn candidate_pending_availability(para_id: ppp::Id) -> Option>; + fn candidate_pending_availability(para_id: ppp::Id) -> Option>; /// Get a vector of events concerning candidates that occurred within a block. - fn candidate_events() -> Vec>; + fn candidate_events() -> Vec>; /// Get all the pending inbound messages in the downward message queue for a para. fn dmq_contents( @@ -184,19 +188,19 @@ sp_api::decl_runtime_apis! { fn validation_code_by_hash(hash: ppp::ValidationCodeHash) -> Option; /// Scrape dispute relevant from on-chain, backing votes and resolved disputes. - fn on_chain_votes() -> Option>; + fn on_chain_votes() -> Option>; /***** Added in v2 *****/ /// Get the session info for the given session, if stored. /// /// NOTE: This function is only available since parachain host version 2. - fn session_info(index: sp_staking::SessionIndex) -> Option; + fn session_info(index: SessionIndex) -> Option; /// Submits a PVF pre-checking statement into the transaction pool. /// /// NOTE: This function is only available since parachain host version 2. - fn submit_pvf_check_statement(stmt: v2::PvfCheckStatement, signature: v2::ValidatorSignature); + fn submit_pvf_check_statement(stmt: PvfCheckStatement, signature: ValidatorSignature); /// Returns code hashes of PVFs that require pre-checking by validators in the active set. /// @@ -206,20 +210,23 @@ sp_api::decl_runtime_apis! { /// Fetch the hash of the validation code used by a para, making the given `OccupiedCoreAssumption`. /// /// NOTE: This function is only available since parachain host version 2. - fn validation_code_hash(para_id: ppp::Id, assumption: v2::OccupiedCoreAssumption) + fn validation_code_hash(para_id: ppp::Id, assumption: OccupiedCoreAssumption) -> Option; - /***** Replaced in v2 *****/ /// Old method to fetch v1 session info. #[changed_in(2)] - fn session_info(index: sp_staking::SessionIndex) -> Option; + fn session_info(index: SessionIndex) -> Option; /***** STAGING *****/ /// Returns all onchain disputes. #[api_version(3)] - fn disputes() -> Vec<(v2::SessionIndex, v2::CandidateHash, v2::DisputeState)>; + fn disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)>; + + /// Returns execution parameters for the session. + #[api_version(4)] + fn session_executor_params(session_index: SessionIndex) -> Option; } } diff --git a/primitives/src/v2/mod.rs b/primitives/src/v2/mod.rs index 0a600f6b318a..606ffd59920c 100644 --- a/primitives/src/v2/mod.rs +++ b/primitives/src/v2/mod.rs @@ -35,7 +35,7 @@ use sp_arithmetic::traits::{BaseArithmetic, Saturating}; pub use runtime_primitives::traits::{BlakeTwo256, Hash as HashT}; // Export some core primitives. -pub use polkadot_core_primitives::{ +pub use polkadot_core_primitives::v2::{ AccountId, AccountIndex, AccountPublic, Balance, Block, BlockId, BlockNumber, CandidateHash, ChainId, DownwardMessage, Hash, Header, InboundDownwardMessage, InboundHrmpMessage, Moment, Nonce, OutboundHrmpMessage, Remark, Signature, UncheckedExtrinsic, diff --git a/primitives/src/vstaging/executor_params.rs b/primitives/src/vstaging/executor_params.rs new file mode 100644 index 000000000000..8d924122f365 --- /dev/null +++ b/primitives/src/vstaging/executor_params.rs @@ -0,0 +1,116 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Abstract execution environment parameter set. +//! +//! Parameter set is encoded as an opaque vector which structure depends on the execution +//! environment itself (except for environment type/version which is always represented +//! by the first element of the vector). Decoding to a usable semantics structure is +//! done in `polkadot-node-core-pvf`. + +use crate::{BlakeTwo256, HashT as _}; +use parity_scale_codec::{Decode, Encode}; +use polkadot_core_primitives::Hash; +use scale_info::TypeInfo; +use sp_std::{ops::Deref, vec, vec::Vec}; + +/// A single executor parameter +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, TypeInfo)] +pub enum ExecutorParam { + /// ## Parameters setting the executuion environment semantics: + /// Max. memory size + MaxMemorySize(u32), + /// Wasm logical stack size limit (max. number of Wasm values on stack) + StackLogicalMax(u32), + /// Executor machine stack size limit, in bytes + StackNativeMax(u32), + /// Max. amount of memory the preparation worker is allowed to use during + /// pre-checking, in bytes + PrecheckingMaxMemory(u64), +} + +/// Unit type wrapper around [`type@Hash`] that represents an execution parameter set hash. +/// +/// This type is produced by [`ExecutorParams::hash`]. +#[derive(Clone, Copy, Encode, Decode, Hash, Eq, PartialEq, PartialOrd, Ord, TypeInfo)] +pub struct ExecutorParamsHash(Hash); + +impl ExecutorParamsHash { + /// Create a new executor parameter hash from `H256` hash + pub fn from_hash(hash: Hash) -> Self { + Self(hash) + } +} + +impl sp_std::fmt::Display for ExecutorParamsHash { + fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result { + self.0.fmt(f) + } +} + +impl sp_std::fmt::Debug for ExecutorParamsHash { + fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result { + write!(f, "{:?}", self.0) + } +} + +impl sp_std::fmt::LowerHex for ExecutorParamsHash { + fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result { + sp_std::fmt::LowerHex::fmt(&self.0, f) + } +} + +/// # Deterministically serialized execution environment semantics +/// Represents an arbitrary semantics of an arbitrary execution environment, so should be kept as +/// abstract as possible. +// ADR: For mandatory entries, mandatoriness should be enforced in code rather than separating them +// into individual fields of the structure. Thus, complex migrations shall be avoided when adding +// new entries and removing old ones. At the moment, there's no mandatory parameters defined. If +// they show up, they must be clearly documented as mandatory ones. +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, TypeInfo)] +pub struct ExecutorParams(Vec); + +impl ExecutorParams { + /// Creates a new, empty executor parameter set + pub fn new() -> Self { + ExecutorParams(vec![]) + } + + /// Returns hash of the set of execution environment parameters + pub fn hash(&self) -> ExecutorParamsHash { + ExecutorParamsHash(BlakeTwo256::hash(&self.encode())) + } +} + +impl Deref for ExecutorParams { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From<&[ExecutorParam]> for ExecutorParams { + fn from(arr: &[ExecutorParam]) -> Self { + ExecutorParams(arr.to_vec()) + } +} + +impl Default for ExecutorParams { + fn default() -> Self { + ExecutorParams(vec![]) + } +} diff --git a/primitives/src/vstaging/mod.rs b/primitives/src/vstaging/mod.rs index 64671bd48a60..d6428d252149 100644 --- a/primitives/src/vstaging/mod.rs +++ b/primitives/src/vstaging/mod.rs @@ -17,3 +17,6 @@ //! Staging Primitives. // Put any primitives used by staging APIs functions here + +pub mod executor_params; +pub use executor_params::{ExecutorParam, ExecutorParams, ExecutorParamsHash}; diff --git a/runtime/common/src/xcm_sender.rs b/runtime/common/src/xcm_sender.rs index 392f0a40a087..e4cc02ae756b 100644 --- a/runtime/common/src/xcm_sender.rs +++ b/runtime/common/src/xcm_sender.rs @@ -18,7 +18,7 @@ use frame_support::traits::Get; use parity_scale_codec::Encode; -use primitives::v2::Id as ParaId; +use primitives::Id as ParaId; use runtime_parachains::{ configuration::{self, HostConfiguration}, dmp, diff --git a/runtime/parachains/src/disputes/migration.rs b/runtime/parachains/src/disputes/migration.rs index 584d4b33872e..1aa340622432 100644 --- a/runtime/parachains/src/disputes/migration.rs +++ b/runtime/parachains/src/disputes/migration.rs @@ -27,7 +27,7 @@ pub mod v1 { use frame_support::{ pallet_prelude::*, storage_alias, traits::OnRuntimeUpgrade, weights::Weight, }; - use primitives::v2::SessionIndex; + use primitives::SessionIndex; use sp_std::prelude::*; #[storage_alias] diff --git a/runtime/parachains/src/runtime_api_impl/vstaging.rs b/runtime/parachains/src/runtime_api_impl/vstaging.rs index 2191fb304e1c..f96323ce5043 100644 --- a/runtime/parachains/src/runtime_api_impl/vstaging.rs +++ b/runtime/parachains/src/runtime_api_impl/vstaging.rs @@ -16,8 +16,8 @@ //! Put implementations of functions from staging APIs here. -use crate::disputes; -use primitives::{CandidateHash, DisputeState, SessionIndex}; +use crate::{disputes, session_info}; +use primitives::{vstaging::ExecutorParams, CandidateHash, DisputeState, SessionIndex}; use sp_std::prelude::*; /// Implementation for `get_session_disputes` function from the runtime API @@ -25,3 +25,18 @@ pub fn get_session_disputes( ) -> Vec<(SessionIndex, CandidateHash, DisputeState)> { >::disputes() } + +/// Get session executor parameter set +pub fn session_executor_params( + session_index: SessionIndex, +) -> Option { + // This is to bootstrap the storage working around the runtime migration issue: + // https://github.com/paritytech/substrate/issues/9997 + // After the bootstrap is complete (no less than 7 session passed with the runtime) + // this code should be replaced with a pure + // >::session_executor_params(session_index) call. + match >::session_executor_params(session_index) { + Some(ep) => Some(ep), + None => Some(ExecutorParams::default()), + } +} diff --git a/runtime/parachains/src/session_info.rs b/runtime/parachains/src/session_info.rs index 1ba865cf4a26..1105bc2e5023 100644 --- a/runtime/parachains/src/session_info.rs +++ b/runtime/parachains/src/session_info.rs @@ -27,7 +27,10 @@ use frame_support::{ pallet_prelude::*, traits::{OneSessionHandler, ValidatorSet, ValidatorSetWithIdentification}, }; -use primitives::{AssignmentId, AuthorityDiscoveryId, SessionIndex, SessionInfo}; +use primitives::{ + vstaging::{ExecutorParam, ExecutorParams}, + AssignmentId, AuthorityDiscoveryId, SessionIndex, SessionInfo, +}; use sp_std::vec::Vec; pub use pallet::*; @@ -37,6 +40,10 @@ pub mod migration; #[cfg(test)] mod tests; +// The order of parameters should be deterministic, that is, one should not reorder them when +// changing the array contents to avoid creating excessive pressure to PVF execution subsys. +const EXECUTOR_PARAMS: [ExecutorParam; 0] = []; + /// A type for representing the validator account id in a session. pub type AccountId = <::ValidatorSet as ValidatorSet< ::AccountId, @@ -102,6 +109,12 @@ pub mod pallet { #[pallet::getter(fn account_keys)] pub(crate) type AccountKeys = StorageMap<_, Identity, SessionIndex, Vec>>; + + /// Executor parameter set for a given session index + #[pallet::storage] + #[pallet::getter(fn session_executor_params)] + pub(crate) type SessionExecutorParams = + StorageMap<_, Identity, SessionIndex, ExecutorParams>; } /// An abstraction for the authority discovery pallet @@ -153,6 +166,7 @@ impl Pallet { // Idx will be missing for a few sessions after the runtime upgrade. // But it shouldn'be be a problem. AccountKeys::::remove(&idx); + SessionExecutorParams::::remove(&idx); } // update `EarliestStoredSession` based on `config.dispute_period` EarliestStoredSession::::set(new_earliest_stored_session); @@ -184,6 +198,10 @@ impl Pallet { dispute_period, }; Sessions::::insert(&new_session_index, &new_session_info); + SessionExecutorParams::::insert( + &new_session_index, + ExecutorParams::from(&EXECUTOR_PARAMS[..]), + ); } /// Called by the initializer to initialize the session info pallet. diff --git a/runtime/rococo/src/lib.rs b/runtime/rococo/src/lib.rs index 0ff291e31630..73b5475c6736 100644 --- a/runtime/rococo/src/lib.rs +++ b/runtime/rococo/src/lib.rs @@ -23,11 +23,11 @@ use pallet_nis::WithMaximumOf; use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use primitives::{ - AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id as ParaId, - InboundDownwardMessage, InboundHrmpMessage, Moment, Nonce, OccupiedCoreAssumption, - PersistedValidationData, ScrapedOnChainVotes, SessionInfo, Signature, ValidationCode, - ValidationCodeHash, ValidatorId, ValidatorIndex, + vstaging::ExecutorParams, AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, + CandidateHash, CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, + Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, Moment, Nonce, + OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionInfo, Signature, + ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, }; use runtime_common::{ assigned_slots, auctions, claims, crowdloan, impl_runtime_weights, impls::ToAuthor, @@ -38,12 +38,15 @@ use sp_std::{cmp::Ordering, collections::btree_map::BTreeMap, prelude::*}; use runtime_parachains::{ configuration as parachains_configuration, disputes as parachains_disputes, - disputes::slashing as parachains_slashing, dmp as parachains_dmp, hrmp as parachains_hrmp, - inclusion as parachains_inclusion, initializer as parachains_initializer, - origin as parachains_origin, paras as parachains_paras, + disputes::slashing as parachains_slashing, + dmp as parachains_dmp, hrmp as parachains_hrmp, inclusion as parachains_inclusion, + initializer as parachains_initializer, origin as parachains_origin, paras as parachains_paras, paras_inherent as parachains_paras_inherent, - runtime_api_impl::v2 as parachains_runtime_api_impl, scheduler as parachains_scheduler, - session_info as parachains_session_info, shared as parachains_shared, ump as parachains_ump, + runtime_api_impl::{ + v2 as parachains_runtime_api_impl, vstaging as parachains_runtime_api_impl_staging, + }, + scheduler as parachains_scheduler, session_info as parachains_session_info, + shared as parachains_shared, ump as parachains_ump, }; use authority_discovery_primitives::AuthorityId as AuthorityDiscoveryId; @@ -1649,7 +1652,7 @@ sp_api::impl_runtime_apis! { } } - #[api_version(3)] + #[api_version(4)] impl primitives::runtime_api::ParachainHost for Runtime { fn validators() -> Vec { parachains_runtime_api_impl::validators::() @@ -1713,6 +1716,10 @@ sp_api::impl_runtime_apis! { parachains_runtime_api_impl::session_info::(index) } + fn session_executor_params(session_index: SessionIndex) -> Option { + parachains_runtime_api_impl_staging::session_executor_params::(session_index) + } + fn dmq_contents(recipient: ParaId) -> Vec> { parachains_runtime_api_impl::dmq_contents::(recipient) } diff --git a/runtime/westend/src/lib.rs b/runtime/westend/src/lib.rs index 90f646afbae3..cb2f3b6e04d8 100644 --- a/runtime/westend/src/lib.rs +++ b/runtime/westend/src/lib.rs @@ -36,11 +36,12 @@ use pallet_session::historical as session_historical; use pallet_transaction_payment::{CurrencyAdapter, FeeDetails, RuntimeDispatchInfo}; use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use primitives::{ - AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id as ParaId, - InboundDownwardMessage, InboundHrmpMessage, Moment, Nonce, OccupiedCoreAssumption, - PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionInfo, Signature, - ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + vstaging::ExecutorParams, AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, + CandidateHash, CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, + Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, Moment, Nonce, + OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, + SessionInfo, Signature, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, + ValidatorSignature, }; use runtime_common::{ assigned_slots, auctions, crowdloan, elections::OnChainAccuracy, impl_runtime_weights, @@ -49,12 +50,15 @@ use runtime_common::{ }; use runtime_parachains::{ configuration as parachains_configuration, disputes as parachains_disputes, - disputes::slashing as parachains_slashing, dmp as parachains_dmp, hrmp as parachains_hrmp, - inclusion as parachains_inclusion, initializer as parachains_initializer, - origin as parachains_origin, paras as parachains_paras, + disputes::slashing as parachains_slashing, + dmp as parachains_dmp, hrmp as parachains_hrmp, inclusion as parachains_inclusion, + initializer as parachains_initializer, origin as parachains_origin, paras as parachains_paras, paras_inherent as parachains_paras_inherent, reward_points as parachains_reward_points, - runtime_api_impl::v2 as parachains_runtime_api_impl, scheduler as parachains_scheduler, - session_info as parachains_session_info, shared as parachains_shared, ump as parachains_ump, + runtime_api_impl::{ + v2 as parachains_runtime_api_impl, vstaging as parachains_runtime_api_impl_staging, + }, + scheduler as parachains_scheduler, session_info as parachains_session_info, + shared as parachains_shared, ump as parachains_ump, }; use scale_info::TypeInfo; use sp_core::{OpaqueMetadata, RuntimeDebug}; @@ -1375,7 +1379,7 @@ sp_api::impl_runtime_apis! { } } - #[api_version(3)] + #[api_version(4)] impl primitives::runtime_api::ParachainHost for Runtime { fn validators() -> Vec { parachains_runtime_api_impl::validators::() @@ -1439,6 +1443,10 @@ sp_api::impl_runtime_apis! { parachains_runtime_api_impl::session_info::(index) } + fn session_executor_params(session_index: SessionIndex) -> Option { + parachains_runtime_api_impl_staging::session_executor_params::(session_index) + } + fn dmq_contents(recipient: ParaId) -> Vec> { parachains_runtime_api_impl::dmq_contents::(recipient) }