Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

[WIP] Pvf host prechecking support #4101

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ impl Artifacts {
}

/// Returns the state of the given artifact by its ID.
pub fn artifact_state(&self, artifact_id: &ArtifactId) -> Option<&ArtifactState> {
self.artifacts.get(artifact_id)
}

/// Returns mutable state of the given artifact by its ID.
pub fn artifact_state_mut(&mut self, artifact_id: &ArtifactId) -> Option<&mut ArtifactState> {
self.artifacts.get_mut(artifact_id)
}
Expand Down
15 changes: 15 additions & 0 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use parity_scale_codec::{Decode, Encode};

/// An error reported during the prechecking routine.
#[derive(Debug, Clone, Encode, Decode)]
pub enum PrecheckError {
/// Failed to precheck the PVF due to the time limit.
TimedOut,
/// Failed to precheck the PVF due to the memory limit.
MemoryLimitReached,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using this anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I don't know yet how to catch this kind of error, even though there's a memory limit in executor configuration, I don't see any public interface that tells you preparation exceeded it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I think that's irrelevant.

The memory limit in the executor is only for the execution. There is no option to limit the memory amount consumed by the compilation. That's actually one of the reasons we do it in a separate process in the first place.

/// Compilation error occurred.
CompileError(String),
/// Couldn't serve the request due to some kind of internal error.
Internal(String),
}

/// A error raised during validation of the candidate.
#[derive(Debug, Clone)]
pub enum ValidationError {
Expand Down
47 changes: 2 additions & 45 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
worker_common::{IdleWorker, Worker, WorkerData, WorkerHandle, Workers},
InvalidCandidate, ValidationError, LOG_TARGET,
};
use async_std::path::PathBuf;
Expand All @@ -32,9 +32,7 @@ use futures::{
Future, FutureExt,
};
use slotmap::HopSlotMap;
use std::{collections::VecDeque, fmt, time::Duration};

slotmap::new_key_type! { struct Worker; }
use std::{collections::VecDeque, time::Duration};

#[derive(Debug)]
pub enum ToQueue {
Expand All @@ -53,47 +51,6 @@ struct ExecuteJob {
result_tx: ResultSender,
}

struct WorkerData {
idle: Option<IdleWorker>,
handle: WorkerHandle,
}

impl fmt::Debug for WorkerData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WorkerData(pid={})", self.handle.id())
}
}

struct Workers {
/// The registry of running workers.
running: HopSlotMap<Worker, WorkerData>,

/// The number of spawning but not yet spawned workers.
spawn_inflight: usize,

/// The maximum number of workers queue can have at once.
capacity: usize,
}

impl Workers {
fn can_afford_one_more(&self) -> bool {
self.spawn_inflight + self.running.len() < self.capacity
}

fn find_available(&self) -> Option<Worker> {
self.running
.iter()
.find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
}

/// Find the associated data by the worker token and extract it's [`IdleWorker`] token.
///
/// Returns `None` if either worker is not recognized or idle token is absent.
fn claim_idle(&mut self, worker: Worker) -> Option<IdleWorker> {
self.running.get_mut(worker)?.idle.take()
}
}

enum QueueEvent {
Spawn(IdleWorker, WorkerHandle),
StartWork(Worker, Outcome, ArtifactId, ResultSender),
Expand Down
12 changes: 10 additions & 2 deletions node/core/pvf/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Interface to the Substrate Executor

use sc_executor_common::{
error::WasmError,
runtime_blob::RuntimeBlob,
wasm_runtime::{InvokeMethod, WasmModule as _},
};
Expand Down Expand Up @@ -73,7 +74,7 @@ const CONFIG: Config = Config {
};

/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds.
pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error::WasmError> {
pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, WasmError> {
let blob = RuntimeBlob::new(code)?;
// It's assumed this function will take care of any prevalidation logic
// that needs to be done.
Expand All @@ -84,10 +85,17 @@ pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error

/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled
/// artifact which can then be used to pass into [`execute`].
pub fn prepare(blob: RuntimeBlob) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
pub fn prepare(blob: RuntimeBlob) -> Result<Vec<u8>, WasmError> {
sc_executor_wasmtime::prepare_runtime_artifact(blob, &CONFIG.semantics)
}

/// Runs [`prepare`] routine on a single thread.
pub fn prepare_single_threaded(blob: RuntimeBlob) -> Result<Vec<u8>, WasmError> {
let mut semantics = CONFIG.semantics;
semantics.parallel_compilation = false;
sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
}

/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
/// upon success.
///
Expand Down
90 changes: 71 additions & 19 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
execute,
metrics::Metrics,
prepare, Priority, Pvf, ValidationError, LOG_TARGET,
precheck, prepare, PrecheckResultSender, Priority, Pvf, ValidationError, LOG_TARGET,
};
use always_assert::never;
use async_std::path::{Path, PathBuf};
Expand All @@ -48,6 +48,17 @@ pub struct ValidationHost {
}

impl ValidationHost {
pub async fn precheck_pvf(
&mut self,
pvf: Pvf,
result_tx: PrecheckResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::PrecheckPvf { pvf, result_tx })
.await
.map_err(|_| "the inner loop hung up".to_string())
}

/// Execute PVF with the given code, execution timeout, parameters and priority.
/// The result of execution will be sent to the provided result sender.
///
Expand Down Expand Up @@ -84,6 +95,10 @@ impl ValidationHost {
}

enum ToHost {
PrecheckPvf {
pvf: Pvf,
result_tx: PrecheckResultSender,
},
ExecutePvf {
pvf: Pvf,
execution_timeout: Duration,
Expand Down Expand Up @@ -150,6 +165,13 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O

let validation_host = ValidationHost { to_host_tx };

let (to_precheck_queue_tx, run_precheck_queue) = precheck::start_precheck_queue(
config.prepare_worker_program_path.clone(),
Duration::from_secs(5),
config.prepare_worker_spawn_timeout,
config.execute_workers_max_num,
);

let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
metrics.clone(),
config.prepare_worker_program_path.clone(),
Expand Down Expand Up @@ -179,7 +201,13 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
let run = async move {
let artifacts = Artifacts::new(&config.cache_path).await;

futures::pin_mut!(run_prepare_queue, run_prepare_pool, run_execute_queue, run_sweeper);
futures::pin_mut!(
run_precheck_queue,
run_prepare_queue,
run_prepare_pool,
run_execute_queue,
run_sweeper
);

run(
Inner {
Expand All @@ -188,12 +216,14 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
artifact_ttl: Duration::from_secs(3600 * 24),
artifacts,
to_host_rx,
to_precheck_queue_tx,
to_prepare_queue_tx,
from_prepare_queue_rx,
to_execute_queue_tx,
to_sweeper_tx,
awaiting_prepare: AwaitingPrepare::default(),
},
run_precheck_queue,
run_prepare_pool,
run_prepare_queue,
run_execute_queue,
Expand Down Expand Up @@ -247,6 +277,8 @@ struct Inner {

to_host_rx: mpsc::Receiver<ToHost>,

to_precheck_queue_tx: mpsc::Sender<precheck::ToQueue>,

to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>,
from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>,

Expand All @@ -266,12 +298,14 @@ async fn run(
artifact_ttl,
mut artifacts,
to_host_rx,
mut to_precheck_queue_tx,
from_prepare_queue_rx,
mut to_prepare_queue_tx,
mut to_execute_queue_tx,
mut to_sweeper_tx,
mut awaiting_prepare,
}: Inner,
precheck_queue: impl Future<Output = ()> + Unpin,
prepare_pool: impl Future<Output = ()> + Unpin,
prepare_queue: impl Future<Output = ()> + Unpin,
execute_queue: impl Future<Output = ()> + Unpin,
Expand All @@ -293,6 +327,7 @@ async fn run(
let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();

// Make sure that the task-futures are fused.
let mut precheck_queue = precheck_queue.fuse();
let mut prepare_queue = prepare_queue.fuse();
let mut prepare_pool = prepare_pool.fuse();
let mut execute_queue = execute_queue.fuse();
Expand All @@ -301,6 +336,10 @@ async fn run(
loop {
// biased to make it behave deterministically for tests.
futures::select_biased! {
_ = precheck_queue => {
never!("precheck_queue: long-running task never concludes; qed");
break;
},
_ = prepare_queue => {
never!("prepare_pool: long-running task never concludes; qed");
break;
Expand Down Expand Up @@ -336,6 +375,7 @@ async fn run(
break_if_fatal!(handle_to_host(
&cache_path,
&mut artifacts,
&mut to_precheck_queue_tx,
&mut to_prepare_queue_tx,
&mut to_execute_queue_tx,
&mut awaiting_prepare,
Expand Down Expand Up @@ -371,12 +411,16 @@ async fn run(
async fn handle_to_host(
cache_path: &Path,
artifacts: &mut Artifacts,
precheck_queue: &mut mpsc::Sender<precheck::ToQueue>,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
to_host: ToHost,
) -> Result<(), Fatal> {
match to_host {
ToHost::PrecheckPvf { pvf, result_tx } => {
handle_precheck_pvf(&artifacts, pvf, precheck_queue, result_tx).await?;
},
ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => {
handle_execute_pvf(
cache_path,
Expand All @@ -400,6 +444,21 @@ async fn handle_to_host(
Ok(())
}

async fn handle_precheck_pvf(
artifacts: &Artifacts,
pvf: Pvf,
precheck_queue: &mut mpsc::Sender<precheck::ToQueue>,
slumber marked this conversation as resolved.
Show resolved Hide resolved
result_sender: PrecheckResultSender,
) -> Result<(), Fatal> {
if let Some(_) = artifacts.artifact_state(&pvf.as_artifact_id()) {
let _ = result_sender.send(Ok(()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the things stand now, we only note whether we processed the artifact or not in artifacts. We do not record anything regarding the preparation outcome. It will also return Some in case the artifact is going through preparation.

Thus this will incorrectly report an Ok when the artifact is in fact preparing or prepared with an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a PVF actually got to the point of preparing, doesn't it mean it's considered prechecked?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, kind of.

That is, under the current model, the owner of the PVF validation host will only send a command to execute PVF, if the PVF was approved on-chain. That assumption is a bit too fragile to rely upon though IMO.

} else {
send_request(precheck_queue, precheck::ToQueue { pvf, result_sender }).await?
}

Ok(())
}

async fn handle_execute_pvf(
cache_path: &Path,
artifacts: &mut Artifacts,
Expand All @@ -419,7 +478,7 @@ async fn handle_execute_pvf(
ArtifactState::Prepared { ref mut last_time_needed } => {
*last_time_needed = SystemTime::now();

send_execute(
send_request(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id, cache_path),
Expand All @@ -431,7 +490,7 @@ async fn handle_execute_pvf(
.await?;
},
ArtifactState::Preparing => {
send_prepare(
send_request(
prepare_queue,
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() },
)
Expand All @@ -444,7 +503,7 @@ async fn handle_execute_pvf(
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
//
artifacts.insert_preparing(artifact_id.clone());
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
send_request(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;

awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
}
Expand Down Expand Up @@ -475,7 +534,7 @@ async fn handle_heads_up(
// The artifact is unknown: register it and put a background job into the prepare queue.
artifacts.insert_preparing(artifact_id.clone());

send_prepare(
send_request(
prepare_queue,
prepare::ToQueue::Enqueue { priority: Priority::Background, pvf: active_pvf },
)
Expand Down Expand Up @@ -526,7 +585,7 @@ async fn handle_prepare_done(
continue
}

send_execute(
send_request(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path),
Expand All @@ -544,18 +603,8 @@ async fn handle_prepare_done(
Ok(())
}

async fn send_prepare(
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
to_queue: prepare::ToQueue,
) -> Result<(), Fatal> {
prepare_queue.send(to_queue).await.map_err(|_| Fatal)
}

async fn send_execute(
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
to_queue: execute::ToQueue,
) -> Result<(), Fatal> {
execute_queue.send(to_queue).await.map_err(|_| Fatal)
async fn send_request<T>(queue: &mut mpsc::Sender<T>, item: T) -> Result<(), Fatal> {
queue.send(item).await.map_err(|_| Fatal)
}

async fn handle_cleanup_pulse(
Expand Down Expand Up @@ -681,6 +730,7 @@ mod tests {
let cache_path = PathBuf::from(std::env::temp_dir());

let (to_host_tx, to_host_rx) = mpsc::channel(10);
let (to_precheck_queue_tx, _to_precheck_queue_rx) = mpsc::channel(10);
let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
Expand All @@ -695,6 +745,7 @@ mod tests {
artifact_ttl,
artifacts,
to_host_rx,
to_precheck_queue_tx,
to_prepare_queue_tx,
from_prepare_queue_rx,
to_execute_queue_tx,
Expand All @@ -705,6 +756,7 @@ mod tests {
mk_dummy_loop(),
mk_dummy_loop(),
mk_dummy_loop(),
mk_dummy_loop(),
)
.boxed();

Expand Down
Loading