diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index e69478479efc..af579863cd61 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::error::PrepareError; +use crate::{error::PrepareError, host::PrepareResultSender}; use always_assert::always; use async_std::path::{Path, PathBuf}; use parity_scale_codec::{Decode, Encode}; @@ -106,7 +106,7 @@ pub enum ArtifactState { last_time_needed: SystemTime, }, /// A task to prepare this artifact is scheduled. - Preparing, + Preparing { waiting_for_response: Vec }, /// The code couldn't be compiled due to an error. Such artifacts /// never reach the executor and stay in the host's memory. FailedToProcess(PrepareError), @@ -145,9 +145,16 @@ impl Artifacts { /// /// This function must be used only for brand-new artifacts and should never be used for /// replacing existing ones. - pub fn insert_preparing(&mut self, artifact_id: ArtifactId) { + pub fn insert_preparing( + &mut self, + artifact_id: ArtifactId, + waiting_for_response: Vec, + ) { // See the precondition. - always!(self.artifacts.insert(artifact_id, ArtifactState::Preparing).is_none()); + always!(self + .artifacts + .insert(artifact_id, ArtifactState::Preparing { waiting_for_response }) + .is_none()); } /// Insert an artifact with the given ID as "prepared". diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 8afd0ddddb4b..197573d1d073 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -16,6 +16,9 @@ use parity_scale_codec::{Decode, Encode}; +/// Result of PVF preparation performed by the validation host. +pub type PrepareResult = Result<(), PrepareError>; + /// An error that occurred during the prepare part of the PVF pipeline. #[derive(Debug, Clone, Encode, Decode)] pub enum PrepareError { @@ -23,6 +26,8 @@ pub enum PrepareError { Prevalidation(String), /// Compilation failed for the given PVF. Preparation(String), + /// Failed to prepare the PVF due to the time limit. + TimedOut, /// This state indicates that the process assigned to prepare the artifact wasn't responsible /// or were killed. This state is reported by the validation host (not by the worker). DidNotMakeIt, @@ -74,7 +79,8 @@ impl From for ValidationError { let error_str = match error { PrepareError::Prevalidation(err) => err, PrepareError::Preparation(err) => err, - PrepareError::DidNotMakeIt => "preparation timeout".to_owned(), + PrepareError::TimedOut => "preparation timeout".to_owned(), + PrepareError::DidNotMakeIt => "communication error".to_owned(), }; ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(error_str)) } diff --git a/node/core/pvf/src/executor_intf.rs b/node/core/pvf/src/executor_intf.rs index abf9f42ee287..52c869c47737 100644 --- a/node/core/pvf/src/executor_intf.rs +++ b/node/core/pvf/src/executor_intf.rs @@ -68,7 +68,13 @@ const CONFIG: Config = Config { native_stack_max: 256 * 1024 * 1024, }), canonicalize_nans: true, - parallel_compilation: true, + // Rationale for turning the multi-threaded compilation off is to make the preparation time + // easily reproducible and as deterministic as possible. + // + // Currently the prepare queue doesn't distinguish between precheck and prepare requests. + // On the one hand, it simplifies the code, on the other, however, slows down compile times + // for execute requests. This behavior may change in future. + parallel_compilation: false, }, }; diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index a50f7f27702e..1210c43c16fc 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -24,7 +24,7 @@ use crate::{ artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, execute, metrics::Metrics, - prepare, Priority, Pvf, ValidationError, LOG_TARGET, + prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, }; use always_assert::never; use async_std::path::{Path, PathBuf}; @@ -41,6 +41,9 @@ use std::{ /// An alias to not spell the type for the oneshot sender for the PVF execution result. pub(crate) type ResultSender = oneshot::Sender>; +/// Transmission end used for sending the PVF preparation result. +pub(crate) type PrepareResultSender = oneshot::Sender; + /// A handle to the async process serving the validation host requests. #[derive(Clone)] pub struct ValidationHost { @@ -48,6 +51,24 @@ pub struct ValidationHost { } impl ValidationHost { + /// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time limit. + /// The result of execution will be sent to the provided result sender. + /// + /// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of + /// situations this function should return immediately. + /// + /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. + pub async fn precheck_pvf( + &mut self, + pvf: Pvf, + result_tx: PrepareResultSender, + ) -> Result<(), String> { + self.to_host_tx + .send(ToHost::PrecheckPvf { pvf, result_tx }) + .await + .map_err(|_| "the inner loop hung up".to_string()) + } + /// Execute PVF with the given code, execution timeout, parameters and priority. /// The result of execution will be sent to the provided result sender. /// @@ -84,6 +105,10 @@ impl ValidationHost { } enum ToHost { + PrecheckPvf { + pvf: Pvf, + result_tx: PrepareResultSender, + }, ExecutePvf { pvf: Pvf, execution_timeout: Duration, @@ -376,6 +401,9 @@ async fn handle_to_host( to_host: ToHost, ) -> Result<(), Fatal> { match to_host { + ToHost::PrecheckPvf { pvf, result_tx } => { + handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?; + }, ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => { handle_execute_pvf( cache_path, @@ -399,6 +427,34 @@ async fn handle_to_host( Ok(()) } +async fn handle_precheck_pvf( + artifacts: &mut Artifacts, + prepare_queue: &mut mpsc::Sender, + pvf: Pvf, + result_sender: PrepareResultSender, +) -> Result<(), Fatal> { + let artifact_id = pvf.as_artifact_id(); + + if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { + match state { + ArtifactState::Prepared { last_time_needed } => { + *last_time_needed = SystemTime::now(); + let _ = result_sender.send(Ok(())); + }, + ArtifactState::Preparing { waiting_for_response } => + waiting_for_response.push(result_sender), + ArtifactState::FailedToProcess(result) => { + let _ = result_sender.send(PrepareResult::Err(result.clone())); + }, + } + } else { + artifacts.insert_preparing(artifact_id, vec![result_sender]); + send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf }) + .await?; + } + Ok(()) +} + async fn handle_execute_pvf( cache_path: &Path, artifacts: &mut Artifacts, @@ -429,7 +485,7 @@ async fn handle_execute_pvf( ) .await?; }, - ArtifactState::Preparing => { + ArtifactState::Preparing { waiting_for_response: _ } => { send_prepare( prepare_queue, prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() }, @@ -445,7 +501,7 @@ async fn handle_execute_pvf( } else { // Artifact is unknown: register it and enqueue a job with the corresponding priority and // - artifacts.insert_preparing(artifact_id.clone()); + artifacts.insert_preparing(artifact_id.clone(), Vec::new()); send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); @@ -468,7 +524,7 @@ async fn handle_heads_up( ArtifactState::Prepared { last_time_needed, .. } => { *last_time_needed = now; }, - ArtifactState::Preparing => { + ArtifactState::Preparing { waiting_for_response: _ } => { // Already preparing. We don't need to send a priority amend either because // it can't get any lower than the background. }, @@ -476,7 +532,7 @@ async fn handle_heads_up( } } else { // The artifact is unknown: register it and put a background job into the prepare queue. - artifacts.insert_preparing(artifact_id.clone()); + artifacts.insert_preparing(artifact_id.clone(), Vec::new()); send_prepare( prepare_queue, @@ -524,9 +580,15 @@ async fn handle_prepare_done( never!("the artifact is already processed unsuccessfully: {:?}", artifact_id); return Ok(()) }, - Some(state @ ArtifactState::Preparing) => state, + Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state, }; + if let ArtifactState::Preparing { waiting_for_response } = state { + for result_sender in waiting_for_response.drain(..) { + let _ = result_sender.send(result.clone()); + } + } + // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); @@ -634,6 +696,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream #[cfg(test)] mod tests { use super::*; + use crate::{InvalidCandidate, PrepareError}; use assert_matches::assert_matches; use futures::future::BoxFuture; @@ -904,8 +967,6 @@ mod tests { #[async_std::test] async fn execute_pvf_requests() { - use crate::error::InvalidCandidate; - let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1002,6 +1063,140 @@ mod tests { ); } + #[async_std::test] + async fn precheck_pvf() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // First, test a simple precheck request. + let (result_tx, result_rx) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + + // The queue received the prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + // Send `Ok` right away and poll the host. + test.from_prepare_queue_tx + .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) }) + .await + .unwrap(); + // No pending execute requests. + test.poll_ensure_to_execute_queue_is_empty().await; + // Received the precheck result. + assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(())); + + // Send multiple requests for the same pvf. + let mut precheck_receivers = Vec::new(); + for _ in 0..3 { + let (result_tx, result_rx) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap(); + precheck_receivers.push(result_rx); + } + // Received prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(2), + result: Err(PrepareError::TimedOut), + }) + .await + .unwrap(); + test.poll_ensure_to_execute_queue_is_empty().await; + for result_rx in precheck_receivers { + assert_matches!( + result_rx.now_or_never().unwrap().unwrap(), + Err(PrepareError::TimedOut) + ); + } + } + + #[async_std::test] + async fn test_prepare_done() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + // Test mixed cases of receiving execute and precheck requests + // for the same pvf. + + // Send PVF for the execution and request the prechecking for it. + let (result_tx, result_rx_execute) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(1), + TEST_EXECUTION_TIMEOUT, + b"pvf2".to_vec(), + Priority::Critical, + result_tx, + ) + .await + .unwrap(); + + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + + let (result_tx, result_rx) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + + // Suppose the preparation failed, the execution queue is empty and both + // "clients" receive their results. + test.from_prepare_queue_tx + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Err(PrepareError::TimedOut), + }) + .await + .unwrap(); + test.poll_ensure_to_execute_queue_is_empty().await; + assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut)); + assert_matches!( + result_rx_execute.now_or_never().unwrap().unwrap(), + Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(_))) + ); + + // Reversed case: first send multiple precheck requests, then ask for an execution. + let mut precheck_receivers = Vec::new(); + for _ in 0..3 { + let (result_tx, result_rx) = oneshot::channel(); + host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap(); + precheck_receivers.push(result_rx); + } + + let (result_tx, _result_rx_execute) = oneshot::channel(); + host.execute_pvf( + Pvf::from_discriminator(2), + TEST_EXECUTION_TIMEOUT, + b"pvf2".to_vec(), + Priority::Critical, + result_tx, + ) + .await + .unwrap(); + // Received prepare request. + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + test.from_prepare_queue_tx + .send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) }) + .await + .unwrap(); + // The execute queue receives new request, preckecking is finished and we can + // fetch results. + assert_matches!( + test.poll_and_recv_to_execute_queue().await, + execute::ToQueue::Enqueue { .. } + ); + for result_rx in precheck_receivers { + assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(())); + } + } + #[async_std::test] async fn cancellation() { let mut test = Builder::default().build(); diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index 387d10e96061..62d7d2c95ede 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -92,7 +92,7 @@ pub mod testing; #[doc(hidden)] pub use sp_tracing; -pub use error::{InvalidCandidate, ValidationError}; +pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; pub use priority::Priority; pub use pvf::Pvf; diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 729f813432f9..4e31164677ef 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -16,7 +16,7 @@ use super::worker::{self, Outcome}; use crate::{ - error::PrepareError, + error::{PrepareError, PrepareResult}, metrics::Metrics, worker_common::{IdleWorker, WorkerHandle}, LOG_TARGET, @@ -87,7 +87,7 @@ pub enum FromPool { rip: bool, /// [`Ok`] indicates that compiled artifact is successfully stored on disk. /// Otherwise, an [error](PrepareError) is supplied. - result: Result<(), PrepareError>, + result: PrepareResult, }, /// The given worker ceased to exist. @@ -341,6 +341,20 @@ fn handle_mux( )?; } + Ok(()) + }, + Outcome::TimedOut => { + if attempt_retire(metrics, spawned, worker) { + reply( + from_pool, + FromPool::Concluded { + worker, + rip: true, + result: Err(PrepareError::TimedOut), + }, + )?; + } + Ok(()) }, } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index d85e6b8a1422..0dfcb4e1a099 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -17,9 +17,7 @@ //! A queue that handles requests for PVF preparation. use super::pool::{self, Worker}; -use crate::{ - artifacts::ArtifactId, error::PrepareError, metrics::Metrics, Priority, Pvf, LOG_TARGET, -}; +use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; use always_assert::{always, never}; use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; @@ -44,8 +42,9 @@ pub struct FromQueue { /// Identifier of an artifact. pub(crate) artifact_id: ArtifactId, /// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact - /// is successfully stored on disk. Otherwise, an [error](PrepareError) is supplied. - pub(crate) result: Result<(), PrepareError>, + /// is successfully stored on disk. Otherwise, an [error](crate::error::PrepareError) + /// is supplied. + pub(crate) result: PrepareResult, } #[derive(Default)] @@ -327,7 +326,7 @@ async fn handle_worker_concluded( queue: &mut Queue, worker: Worker, rip: bool, - result: Result<(), PrepareError>, + result: PrepareResult, ) -> Result<(), Fatal> { queue.metrics.prepare_concluded(); @@ -529,6 +528,7 @@ pub fn start( #[cfg(test)] mod tests { use super::*; + use crate::error::PrepareError; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; use slotmap::SlotMap; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index a8bb3516e296..8d157bd8f9b2 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -16,7 +16,7 @@ use crate::{ artifacts::CompiledArtifact, - error::PrepareError, + error::{PrepareError, PrepareResult}, worker_common::{ bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -28,8 +28,6 @@ use async_std::{ os::unix::net::UnixStream, path::{Path, PathBuf}, }; -use futures::FutureExt as _; -use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{sync::Arc, time::Duration}; @@ -51,15 +49,15 @@ pub async fn spawn( pub enum Outcome { /// The worker has finished the work assigned to it. - Concluded { worker: IdleWorker, result: Result<(), PrepareError> }, + Concluded { worker: IdleWorker, result: PrepareResult }, /// The host tried to reach the worker but failed. This is most likely because the worked was /// killed by the system. Unreachable, - /// The execution was interrupted abruptly and the worker is not available anymore. For example, - /// this could've happen because the worker hadn't finished the work until the given deadline. + /// The worker failed to finish the job until the given deadline. /// - /// Note that in this case the artifact file is written (unless there was an error writing the - /// the artifact). + /// The worker is no longer usable and should be killed. + TimedOut, + /// The execution was interrupted abruptly and the worker is not available anymore. /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. DidNotMakeIt, @@ -106,77 +104,78 @@ pub async fn start_work( #[derive(Debug)] enum Selected { - Done(Result<(), PrepareError>), + Done(PrepareResult), IoErr, Deadline, } - let selected = futures::select! { - res = framed_recv(&mut stream).fuse() => { - match res { - Ok(response_bytes) => { - // By convention we expect encoded `Result<(), PrepareError>`. - if let Ok(result) = - >::decode(&mut response_bytes.clone().as_slice()) - { - if result.is_ok() { - tracing::debug!( - target: LOG_TARGET, - worker_pid = %pid, - "promoting WIP artifact {} to {}", - tmp_file.display(), - artifact_path.display(), - ); - - async_std::fs::rename(&tmp_file, &artifact_path) - .await - .map(|_| Selected::Done(result)) - .unwrap_or_else(|err| { - tracing::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "failed to rename the artifact from {} to {}: {:?}", - tmp_file.display(), - artifact_path.display(), - err, - ); - Selected::IoErr - }) - } else { - Selected::Done(result) - } - } else { - // We received invalid bytes from the worker. - let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; - tracing::warn!( + let selected = + match async_std::future::timeout(COMPILATION_TIMEOUT, framed_recv(&mut stream)).await { + Ok(Ok(response_bytes)) => { + // Received bytes from worker within the time limit. + // By convention we expect encoded `PrepareResult`. + if let Ok(result) = PrepareResult::decode(&mut response_bytes.as_slice()) { + if result.is_ok() { + tracing::debug!( target: LOG_TARGET, worker_pid = %pid, - "received unexpected response from the prepare worker: {}", - HexDisplay::from(&bound_bytes), + "promoting WIP artifact {} to {}", + tmp_file.display(), + artifact_path.display(), ); - Selected::IoErr + + async_std::fs::rename(&tmp_file, &artifact_path) + .await + .map(|_| Selected::Done(result)) + .unwrap_or_else(|err| { + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to rename the artifact from {} to {}: {:?}", + tmp_file.display(), + artifact_path.display(), + err, + ); + Selected::IoErr + }) + } else { + Selected::Done(result) } - }, - Err(err) => { + } else { + // We received invalid bytes from the worker. + let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; tracing::warn!( target: LOG_TARGET, worker_pid = %pid, - "failed to recv a prepare response: {:?}", - err, + "received unexpected response from the prepare worker: {}", + HexDisplay::from(&bound_bytes), ); Selected::IoErr } - } - }, - _ = Delay::new(COMPILATION_TIMEOUT).fuse() => Selected::Deadline, - }; + }, + Ok(Err(err)) => { + // Communication error within the time limit. + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to recv a prepare response: {:?}", + err, + ); + Selected::IoErr + }, + Err(_) => { + // Timed out. + Selected::Deadline + }, + }; match selected { Selected::Done(result) => { renice(pid, NICENESS_FOREGROUND); Outcome::Concluded { worker: IdleWorker { stream, pid }, result } }, - Selected::IoErr | Selected::Deadline => Outcome::DidNotMakeIt, + Selected::Deadline => Outcome::TimedOut, + Selected::IoErr => Outcome::DidNotMakeIt, } }) .await diff --git a/scripts/gitlab/lingua.dic b/scripts/gitlab/lingua.dic index 9c5a29d8ba67..a49f8079d3c2 100644 --- a/scripts/gitlab/lingua.dic +++ b/scripts/gitlab/lingua.dic @@ -194,6 +194,7 @@ PoS/MS PoV/MS PoW/MS PR +precheck preconfigured preimage/MS preopen