-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[WIP] Pvf host prechecking support #4101
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ use crate::{ | |
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, | ||
execute, | ||
metrics::Metrics, | ||
prepare, Priority, Pvf, ValidationError, LOG_TARGET, | ||
precheck, prepare, PrecheckResultSender, Priority, Pvf, ValidationError, LOG_TARGET, | ||
}; | ||
use always_assert::never; | ||
use async_std::path::{Path, PathBuf}; | ||
|
@@ -48,6 +48,17 @@ pub struct ValidationHost { | |
} | ||
|
||
impl ValidationHost { | ||
pub async fn precheck_pvf( | ||
&mut self, | ||
pvf: Pvf, | ||
result_tx: PrecheckResultSender, | ||
) -> Result<(), String> { | ||
self.to_host_tx | ||
.send(ToHost::PrecheckPvf { pvf, result_tx }) | ||
.await | ||
.map_err(|_| "the inner loop hung up".to_string()) | ||
} | ||
|
||
/// Execute PVF with the given code, execution timeout, parameters and priority. | ||
/// The result of execution will be sent to the provided result sender. | ||
/// | ||
|
@@ -84,6 +95,10 @@ impl ValidationHost { | |
} | ||
|
||
enum ToHost { | ||
PrecheckPvf { | ||
pvf: Pvf, | ||
result_tx: PrecheckResultSender, | ||
}, | ||
ExecutePvf { | ||
pvf: Pvf, | ||
execution_timeout: Duration, | ||
|
@@ -150,6 +165,13 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O | |
|
||
let validation_host = ValidationHost { to_host_tx }; | ||
|
||
let (to_precheck_queue_tx, run_precheck_queue) = precheck::start_precheck_queue( | ||
config.prepare_worker_program_path.clone(), | ||
Duration::from_secs(5), | ||
config.prepare_worker_spawn_timeout, | ||
config.execute_workers_max_num, | ||
); | ||
|
||
let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool( | ||
metrics.clone(), | ||
config.prepare_worker_program_path.clone(), | ||
|
@@ -179,7 +201,13 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O | |
let run = async move { | ||
let artifacts = Artifacts::new(&config.cache_path).await; | ||
|
||
futures::pin_mut!(run_prepare_queue, run_prepare_pool, run_execute_queue, run_sweeper); | ||
futures::pin_mut!( | ||
run_precheck_queue, | ||
run_prepare_queue, | ||
run_prepare_pool, | ||
run_execute_queue, | ||
run_sweeper | ||
); | ||
|
||
run( | ||
Inner { | ||
|
@@ -188,12 +216,14 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O | |
artifact_ttl: Duration::from_secs(3600 * 24), | ||
artifacts, | ||
to_host_rx, | ||
to_precheck_queue_tx, | ||
to_prepare_queue_tx, | ||
from_prepare_queue_rx, | ||
to_execute_queue_tx, | ||
to_sweeper_tx, | ||
awaiting_prepare: AwaitingPrepare::default(), | ||
}, | ||
run_precheck_queue, | ||
run_prepare_pool, | ||
run_prepare_queue, | ||
run_execute_queue, | ||
|
@@ -247,6 +277,8 @@ struct Inner { | |
|
||
to_host_rx: mpsc::Receiver<ToHost>, | ||
|
||
to_precheck_queue_tx: mpsc::Sender<precheck::ToQueue>, | ||
|
||
to_prepare_queue_tx: mpsc::Sender<prepare::ToQueue>, | ||
from_prepare_queue_rx: mpsc::UnboundedReceiver<prepare::FromQueue>, | ||
|
||
|
@@ -266,12 +298,14 @@ async fn run( | |
artifact_ttl, | ||
mut artifacts, | ||
to_host_rx, | ||
mut to_precheck_queue_tx, | ||
from_prepare_queue_rx, | ||
mut to_prepare_queue_tx, | ||
mut to_execute_queue_tx, | ||
mut to_sweeper_tx, | ||
mut awaiting_prepare, | ||
}: Inner, | ||
precheck_queue: impl Future<Output = ()> + Unpin, | ||
prepare_pool: impl Future<Output = ()> + Unpin, | ||
prepare_queue: impl Future<Output = ()> + Unpin, | ||
execute_queue: impl Future<Output = ()> + Unpin, | ||
|
@@ -293,6 +327,7 @@ async fn run( | |
let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse(); | ||
|
||
// Make sure that the task-futures are fused. | ||
let mut precheck_queue = precheck_queue.fuse(); | ||
let mut prepare_queue = prepare_queue.fuse(); | ||
let mut prepare_pool = prepare_pool.fuse(); | ||
let mut execute_queue = execute_queue.fuse(); | ||
|
@@ -301,6 +336,10 @@ async fn run( | |
loop { | ||
// biased to make it behave deterministically for tests. | ||
futures::select_biased! { | ||
_ = precheck_queue => { | ||
never!("precheck_queue: long-running task never concludes; qed"); | ||
break; | ||
}, | ||
_ = prepare_queue => { | ||
never!("prepare_pool: long-running task never concludes; qed"); | ||
break; | ||
|
@@ -336,6 +375,7 @@ async fn run( | |
break_if_fatal!(handle_to_host( | ||
&cache_path, | ||
&mut artifacts, | ||
&mut to_precheck_queue_tx, | ||
&mut to_prepare_queue_tx, | ||
&mut to_execute_queue_tx, | ||
&mut awaiting_prepare, | ||
|
@@ -371,12 +411,16 @@ async fn run( | |
async fn handle_to_host( | ||
cache_path: &Path, | ||
artifacts: &mut Artifacts, | ||
precheck_queue: &mut mpsc::Sender<precheck::ToQueue>, | ||
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>, | ||
execute_queue: &mut mpsc::Sender<execute::ToQueue>, | ||
awaiting_prepare: &mut AwaitingPrepare, | ||
to_host: ToHost, | ||
) -> Result<(), Fatal> { | ||
match to_host { | ||
ToHost::PrecheckPvf { pvf, result_tx } => { | ||
handle_precheck_pvf(&artifacts, pvf, precheck_queue, result_tx).await?; | ||
}, | ||
ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => { | ||
handle_execute_pvf( | ||
cache_path, | ||
|
@@ -400,6 +444,21 @@ async fn handle_to_host( | |
Ok(()) | ||
} | ||
|
||
async fn handle_precheck_pvf( | ||
artifacts: &Artifacts, | ||
pvf: Pvf, | ||
precheck_queue: &mut mpsc::Sender<precheck::ToQueue>, | ||
slumber marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result_sender: PrecheckResultSender, | ||
) -> Result<(), Fatal> { | ||
if let Some(_) = artifacts.artifact_state(&pvf.as_artifact_id()) { | ||
let _ = result_sender.send(Ok(())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As the things stand now, we only note whether we processed the artifact or not in Thus this will incorrectly report an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a PVF actually got to the point of preparing, doesn't it mean it's considered prechecked? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, kind of. That is, under the current model, the owner of the PVF validation host will only send a command to execute PVF, if the PVF was approved on-chain. That assumption is a bit too fragile to rely upon though IMO. |
||
} else { | ||
send_request(precheck_queue, precheck::ToQueue { pvf, result_sender }).await? | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn handle_execute_pvf( | ||
cache_path: &Path, | ||
artifacts: &mut Artifacts, | ||
|
@@ -419,7 +478,7 @@ async fn handle_execute_pvf( | |
ArtifactState::Prepared { ref mut last_time_needed } => { | ||
*last_time_needed = SystemTime::now(); | ||
|
||
send_execute( | ||
send_request( | ||
execute_queue, | ||
execute::ToQueue::Enqueue { | ||
artifact: ArtifactPathId::new(artifact_id, cache_path), | ||
|
@@ -431,7 +490,7 @@ async fn handle_execute_pvf( | |
.await?; | ||
}, | ||
ArtifactState::Preparing => { | ||
send_prepare( | ||
send_request( | ||
prepare_queue, | ||
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() }, | ||
) | ||
|
@@ -444,7 +503,7 @@ async fn handle_execute_pvf( | |
// Artifact is unknown: register it and enqueue a job with the corresponding priority and | ||
// | ||
artifacts.insert_preparing(artifact_id.clone()); | ||
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; | ||
send_request(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; | ||
|
||
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); | ||
} | ||
|
@@ -475,7 +534,7 @@ async fn handle_heads_up( | |
// The artifact is unknown: register it and put a background job into the prepare queue. | ||
artifacts.insert_preparing(artifact_id.clone()); | ||
|
||
send_prepare( | ||
send_request( | ||
prepare_queue, | ||
prepare::ToQueue::Enqueue { priority: Priority::Background, pvf: active_pvf }, | ||
) | ||
|
@@ -526,7 +585,7 @@ async fn handle_prepare_done( | |
continue | ||
} | ||
|
||
send_execute( | ||
send_request( | ||
execute_queue, | ||
execute::ToQueue::Enqueue { | ||
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path), | ||
|
@@ -544,18 +603,8 @@ async fn handle_prepare_done( | |
Ok(()) | ||
} | ||
|
||
async fn send_prepare( | ||
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>, | ||
to_queue: prepare::ToQueue, | ||
) -> Result<(), Fatal> { | ||
prepare_queue.send(to_queue).await.map_err(|_| Fatal) | ||
} | ||
|
||
async fn send_execute( | ||
execute_queue: &mut mpsc::Sender<execute::ToQueue>, | ||
to_queue: execute::ToQueue, | ||
) -> Result<(), Fatal> { | ||
execute_queue.send(to_queue).await.map_err(|_| Fatal) | ||
async fn send_request<T>(queue: &mut mpsc::Sender<T>, item: T) -> Result<(), Fatal> { | ||
queue.send(item).await.map_err(|_| Fatal) | ||
} | ||
|
||
async fn handle_cleanup_pulse( | ||
|
@@ -681,6 +730,7 @@ mod tests { | |
let cache_path = PathBuf::from(std::env::temp_dir()); | ||
|
||
let (to_host_tx, to_host_rx) = mpsc::channel(10); | ||
let (to_precheck_queue_tx, _to_precheck_queue_rx) = mpsc::channel(10); | ||
let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10); | ||
let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded(); | ||
let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10); | ||
|
@@ -695,6 +745,7 @@ mod tests { | |
artifact_ttl, | ||
artifacts, | ||
to_host_rx, | ||
to_precheck_queue_tx, | ||
to_prepare_queue_tx, | ||
from_prepare_queue_rx, | ||
to_execute_queue_tx, | ||
|
@@ -705,6 +756,7 @@ mod tests { | |
mk_dummy_loop(), | ||
mk_dummy_loop(), | ||
mk_dummy_loop(), | ||
mk_dummy_loop(), | ||
) | ||
.boxed(); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we using this anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I don't know yet how to catch this kind of error, even though there's a memory limit in executor configuration, I don't see any public interface that tells you preparation exceeded it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I think that's irrelevant.
The memory limit in the executor is only for the execution. There is no option to limit the memory amount consumed by the compilation. That's actually one of the reasons we do it in a separate process in the first place.