Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF host prechecking support v2 #4123

Merged
merged 24 commits into from
Nov 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::error::PrepareError;
use crate::{error::PrepareError, host::PrepareResultSender};
use always_assert::always;
use async_std::path::{Path, PathBuf};
use parity_scale_codec::{Decode, Encode};
Expand Down Expand Up @@ -106,7 +106,7 @@ pub enum ArtifactState {
last_time_needed: SystemTime,
},
/// A task to prepare this artifact is scheduled.
Preparing,
Preparing { waiting_for_response: Vec<PrepareResultSender> },
/// The code couldn't be compiled due to an error. Such artifacts
/// never reach the executor and stay in the host's memory.
FailedToProcess(PrepareError),
Expand Down Expand Up @@ -145,9 +145,16 @@ impl Artifacts {
///
/// This function must be used only for brand-new artifacts and should never be used for
/// replacing existing ones.
pub fn insert_preparing(&mut self, artifact_id: ArtifactId) {
pub fn insert_preparing(
&mut self,
artifact_id: ArtifactId,
waiting_for_response: Vec<PrepareResultSender>,
) {
// See the precondition.
always!(self.artifacts.insert(artifact_id, ArtifactState::Preparing).is_none());
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Preparing { waiting_for_response })
.is_none());
}

/// Insert an artifact with the given ID as "prepared".
Expand Down
8 changes: 7 additions & 1 deletion node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@

use parity_scale_codec::{Decode, Encode};

/// Result of PVF preparation performed by the validation host.
pub type PrepareResult = Result<(), PrepareError>;

/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)]
pub enum PrepareError {
/// During the prevalidation stage of preparation an issue was found with the PVF.
Prevalidation(String),
/// Compilation failed for the given PVF.
Preparation(String),
/// Failed to prepare the PVF due to the time limit.
TimedOut,
/// This state indicates that the process assigned to prepare the artifact wasn't responsible
/// or were killed. This state is reported by the validation host (not by the worker).
DidNotMakeIt,
Expand Down Expand Up @@ -74,7 +79,8 @@ impl From<PrepareError> for ValidationError {
let error_str = match error {
PrepareError::Prevalidation(err) => err,
PrepareError::Preparation(err) => err,
PrepareError::DidNotMakeIt => "preparation timeout".to_owned(),
PrepareError::TimedOut => "preparation timeout".to_owned(),
PrepareError::DidNotMakeIt => "communication error".to_owned(),
};
ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(error_str))
}
Expand Down
8 changes: 7 additions & 1 deletion node/core/pvf/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ const CONFIG: Config = Config {
native_stack_max: 256 * 1024 * 1024,
}),
canonicalize_nans: true,
parallel_compilation: true,
// Rationale for turning the multi-threaded compilation off is to make the preparation time
// easily reproducible and as deterministic as possible.
//
// Currently the prepare queue doesn't distinguish between precheck and prepare requests.
// On the one hand, it simplifies the code, on the other, however, slows down compile times
// for execute requests. This behavior may change in future.
parallel_compilation: false,
slumber marked this conversation as resolved.
Show resolved Hide resolved
},
};

Expand Down
211 changes: 203 additions & 8 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
execute,
metrics::Metrics,
prepare, Priority, Pvf, ValidationError, LOG_TARGET,
prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET,
};
use always_assert::never;
use async_std::path::{Path, PathBuf};
Expand All @@ -41,13 +41,34 @@ use std::{
/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

/// Transmission end used for sending the PVF preparation result.
pub(crate) type PrepareResultSender = oneshot::Sender<PrepareResult>;

/// A handle to the async process serving the validation host requests.
#[derive(Clone)]
pub struct ValidationHost {
to_host_tx: mpsc::Sender<ToHost>,
}

impl ValidationHost {
/// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time limit.
/// The result of execution will be sent to the provided result sender.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn precheck_pvf(
&mut self,
pvf: Pvf,
result_tx: PrepareResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::PrecheckPvf { pvf, result_tx })
.await
.map_err(|_| "the inner loop hung up".to_string())
}

/// Execute PVF with the given code, execution timeout, parameters and priority.
/// The result of execution will be sent to the provided result sender.
///
Expand Down Expand Up @@ -84,6 +105,10 @@ impl ValidationHost {
}

enum ToHost {
PrecheckPvf {
pvf: Pvf,
result_tx: PrepareResultSender,
},
ExecutePvf {
pvf: Pvf,
execution_timeout: Duration,
Expand Down Expand Up @@ -376,6 +401,9 @@ async fn handle_to_host(
to_host: ToHost,
) -> Result<(), Fatal> {
match to_host {
ToHost::PrecheckPvf { pvf, result_tx } => {
handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
},
ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => {
handle_execute_pvf(
cache_path,
Expand All @@ -399,6 +427,34 @@ async fn handle_to_host(
Ok(())
}

async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
pvf: Pvf,
result_sender: PrepareResultSender,
) -> Result<(), Fatal> {
let artifact_id = pvf.as_artifact_id();

if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed } => {
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(()));
},
ArtifactState::Preparing { waiting_for_response } =>
waiting_for_response.push(result_sender),
ArtifactState::FailedToProcess(result) => {
let _ = result_sender.send(PrepareResult::Err(result.clone()));
},
}
} else {
artifacts.insert_preparing(artifact_id, vec![result_sender]);
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
.await?;
}
Ok(())
}

async fn handle_execute_pvf(
cache_path: &Path,
artifacts: &mut Artifacts,
Expand Down Expand Up @@ -429,7 +485,7 @@ async fn handle_execute_pvf(
)
.await?;
},
ArtifactState::Preparing => {
ArtifactState::Preparing { waiting_for_response: _ } => {
send_prepare(
prepare_queue,
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() },
Expand All @@ -445,7 +501,7 @@ async fn handle_execute_pvf(
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
//
artifacts.insert_preparing(artifact_id.clone());
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;

awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
Expand All @@ -468,15 +524,15 @@ async fn handle_heads_up(
ArtifactState::Prepared { last_time_needed, .. } => {
*last_time_needed = now;
},
ArtifactState::Preparing => {
ArtifactState::Preparing { waiting_for_response: _ } => {
// Already preparing. We don't need to send a priority amend either because
// it can't get any lower than the background.
},
ArtifactState::FailedToProcess(_) => {},
}
} else {
// The artifact is unknown: register it and put a background job into the prepare queue.
artifacts.insert_preparing(artifact_id.clone());
artifacts.insert_preparing(artifact_id.clone(), Vec::new());

send_prepare(
prepare_queue,
Expand Down Expand Up @@ -524,9 +580,15 @@ async fn handle_prepare_done(
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
return Ok(())
},
Some(state @ ArtifactState::Preparing) => state,
Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state,
};

if let ArtifactState::Preparing { waiting_for_response } = state {
for result_sender in waiting_for_response.drain(..) {
let _ = result_sender.send(result.clone());
}
}

// It's finally time to dispatch all the execution requests that were waiting for this artifact
// to be prepared.
let pending_requests = awaiting_prepare.take(&artifact_id);
Expand Down Expand Up @@ -634,6 +696,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
#[cfg(test)]
mod tests {
use super::*;
use crate::{InvalidCandidate, PrepareError};
use assert_matches::assert_matches;
use futures::future::BoxFuture;

Expand Down Expand Up @@ -904,8 +967,6 @@ mod tests {

#[async_std::test]
async fn execute_pvf_requests() {
use crate::error::InvalidCandidate;

let mut test = Builder::default().build();
let mut host = test.host_handle();

Expand Down Expand Up @@ -1002,6 +1063,140 @@ mod tests {
);
}

#[async_std::test]
async fn precheck_pvf() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

// First, test a simple precheck request.
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();

// The queue received the prepare request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
// Send `Ok` right away and poll the host.
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.await
.unwrap();
// No pending execute requests.
test.poll_ensure_to_execute_queue_is_empty().await;
// Received the precheck result.
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(()));

// Send multiple requests for the same pvf.
let mut precheck_receivers = Vec::new();
for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
precheck_receivers.push(result_rx);
}
// Received prepare request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
test.poll_ensure_to_execute_queue_is_empty().await;
for result_rx in precheck_receivers {
assert_matches!(
result_rx.now_or_never().unwrap().unwrap(),
Err(PrepareError::TimedOut)
);
}
}

#[async_std::test]
async fn test_prepare_done() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

// Test mixed cases of receiving execute and precheck requests
// for the same pvf.

// Send PVF for the execution and request the prechecking for it.
let (result_tx, result_rx_execute) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Critical,
result_tx,
)
.await
.unwrap();

assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);

let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();

// Suppose the preparation failed, the execution queue is empty and both
// "clients" receive their results.
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Err(PrepareError::TimedOut),
})
.await
.unwrap();
test.poll_ensure_to_execute_queue_is_empty().await;
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Err(PrepareError::TimedOut));
assert_matches!(
result_rx_execute.now_or_never().unwrap().unwrap(),
Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(_)))
);

// Reversed case: first send multiple precheck requests, then ask for an execution.
let mut precheck_receivers = Vec::new();
for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
precheck_receivers.push(result_rx);
}

let (result_tx, _result_rx_execute) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(2),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Critical,
result_tx,
)
.await
.unwrap();
// Received prepare request.
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) })
.await
.unwrap();
// The execute queue receives new request, preckecking is finished and we can
// fetch results.
assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { .. }
);
for result_rx in precheck_receivers {
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(()));
}
}

#[async_std::test]
async fn cancellation() {
let mut test = Builder::default().build();
Expand Down
Loading