Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PVF: Add back socket path parameter, use tmp socket path #1780

Merged
merged 2 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions polkadot/node/core/pvf/common/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

pub mod security;

use crate::{worker_dir, SecurityStatus, LOG_TARGET};
use crate::{SecurityStatus, LOG_TARGET};
use cpu_time::ProcessTime;
use futures::never::Never;
use std::{
Expand Down Expand Up @@ -115,6 +115,7 @@ macro_rules! decl_worker_main {
},
}

let mut socket_path = None;
let mut worker_dir_path = None;
let mut node_version = None;
let mut can_enable_landlock = false;
Expand All @@ -123,6 +124,10 @@ macro_rules! decl_worker_main {
let mut i = 2;
while i < args.len() {
match args[i].as_ref() {
"--socket-path" => {
socket_path = Some(args[i + 1].as_str());
i += 1
},
"--worker-dir-path" => {
worker_dir_path = Some(args[i + 1].as_str());
i += 1
Expand All @@ -138,16 +143,24 @@ macro_rules! decl_worker_main {
}
i += 1;
}
let socket_path = socket_path.expect("the --socket-path argument is required");
let worker_dir_path =
worker_dir_path.expect("the --worker-dir-path argument is required");

let socket_path = std::path::Path::new(socket_path).to_owned();
let worker_dir_path = std::path::Path::new(worker_dir_path).to_owned();
let security_status = $crate::SecurityStatus {
can_enable_landlock,
can_unshare_user_namespace_and_change_root,
};

$entrypoint(worker_dir_path, node_version, Some($worker_version), security_status);
$entrypoint(
socket_path,
worker_dir_path,
node_version,
Some($worker_version),
security_status,
);
}
};
}
Expand Down Expand Up @@ -177,6 +190,7 @@ impl fmt::Display for WorkerKind {
// the version that this crate was compiled with.
pub fn worker_event_loop<F, Fut>(
worker_kind: WorkerKind,
socket_path: PathBuf,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
Expand All @@ -190,6 +204,7 @@ pub fn worker_event_loop<F, Fut>(
gum::debug!(
target: LOG_TARGET,
%worker_pid,
?socket_path,
?worker_dir_path,
?security_status,
"starting pvf worker ({})",
Expand Down Expand Up @@ -237,12 +252,9 @@ pub fn worker_event_loop<F, Fut>(
}

// Connect to the socket.
let socket_path = worker_dir::socket(&worker_dir_path);
let stream = || -> std::io::Result<UnixStream> {
let stream = UnixStream::connect(&socket_path)?;
// Remove the socket here. We don't also need to do this on the host-side; on failed
// rendezvous, the host will delete the whole worker dir.
std::fs::remove_file(&socket_path)?;
let _ = std::fs::remove_file(&socket_path);
Ok(stream)
}();
let stream = match stream {
Expand Down
5 changes: 0 additions & 5 deletions polkadot/node/core/pvf/common/src/worker_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::path::{Path, PathBuf};

const WORKER_EXECUTE_ARTIFACT_NAME: &str = "artifact";
const WORKER_PREPARE_TMP_ARTIFACT_NAME: &str = "tmp-artifact";
const WORKER_SOCKET_NAME: &str = "socket";

pub fn execute_artifact(worker_dir_path: &Path) -> PathBuf {
worker_dir_path.join(WORKER_EXECUTE_ARTIFACT_NAME)
Expand All @@ -29,7 +28,3 @@ pub fn execute_artifact(worker_dir_path: &Path) -> PathBuf {
pub fn prepare_tmp_artifact(worker_dir_path: &Path) -> PathBuf {
worker_dir_path.join(WORKER_PREPARE_TMP_ARTIFACT_NAME)
}

pub fn socket(worker_dir_path: &Path) -> PathBuf {
worker_dir_path.join(WORKER_SOCKET_NAME)
}
4 changes: 4 additions & 0 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()>
///
/// # Parameters
///
/// - `socket_path`: specifies the path to the socket used to communicate with the host.
///
/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
///
/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
Expand All @@ -121,13 +123,15 @@ fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()>
///
/// - `security_status`: contains the detected status of security features.
pub fn worker_entrypoint(
socket_path: PathBuf,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
security_status: SecurityStatus,
) {
worker_event_loop(
WorkerKind::Execute,
socket_path,
worker_dir_path,
node_version,
worker_version,
Expand Down
4 changes: 4 additions & 0 deletions polkadot/node/core/pvf/prepare-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<(
///
/// # Parameters
///
/// - `socket_path`: specifies the path to the socket used to communicate with the host.
///
/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
///
/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
Expand Down Expand Up @@ -116,13 +118,15 @@ fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<(
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(
socket_path: PathBuf,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
security_status: SecurityStatus,
) {
worker_event_loop(
WorkerKind::Prepare,
socket_path,
worker_dir_path,
node_version,
worker_version,
Expand Down
140 changes: 85 additions & 55 deletions polkadot/node/core/pvf/src/worker_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::LOG_TARGET;
use futures::FutureExt as _;
use futures_timer::Delay;
use pin_project::pin_project;
use polkadot_node_core_pvf_common::{worker_dir, SecurityStatus};
use polkadot_node_core_pvf_common::SecurityStatus;
use rand::Rng;
use std::{
fmt, mem,
Expand Down Expand Up @@ -67,71 +67,99 @@ pub async fn spawn_with_program_path(
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let program_path = program_path.into();
let worker_dir = WorkerDir::new(debug_id, cache_path).await?;
let socket_path = worker_dir::socket(&worker_dir.path);

let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();

let listener = UnixListener::bind(&socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir,
?socket_path,
"cannot bind unix socket: {:?}",
err,
);
SpawnErr::Bind
})?;

let handle = WorkerHandle::spawn(&program_path, &extra_args, &worker_dir.path, security_status)
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir.path,
?socket_path,
"cannot spawn a worker: {:?}",
err,
);
SpawnErr::ProcessSpawn
})?;

let worker_dir_path = worker_dir.path.clone();
futures::select! {
accept_result = listener.accept().fuse() => {
let (stream, _) = accept_result.map_err(|err| {
with_transient_socket_path(debug_id, |socket_path| {
let socket_path = socket_path.to_owned();

async move {
let listener = UnixListener::bind(&socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?worker_dir,
?socket_path,
"cannot accept a worker: {:?}",
"cannot bind unix socket: {:?}",
err,
);
SpawnErr::Accept
SpawnErr::Bind
})?;
Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
}
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?socket_path,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)

let handle = WorkerHandle::spawn(
&program_path,
&extra_args,
&socket_path,
&worker_dir.path,
security_status,
)
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir.path,
?socket_path,
"cannot spawn a worker: {:?}",
err,
);
SpawnErr::ProcessSpawn
})?;

let worker_dir_path = worker_dir.path.clone();
futures::select! {
accept_result = listener.accept().fuse() => {
let (stream, _) = accept_result.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?socket_path,
"cannot accept a worker: {:?}",
err,
);
SpawnErr::Accept
})?;
Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
}
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?socket_path,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)
}
}
}
}
})
.await
}

async fn with_transient_socket_path<T, F, Fut>(debug_id: &'static str, f: F) -> Result<T, SpawnErr>
where
F: FnOnce(&Path) -> Fut,
Fut: futures::Future<Output = Result<T, SpawnErr>> + 'static,
{
let socket_path = tmppath(&format!("pvf-host-{}", debug_id))
.await
.map_err(|_| SpawnErr::TmpPath)?;
let result = f(&socket_path).await;

// Best effort to remove the socket file. Under normal circumstances the socket will be removed
// by the worker. We make sure that it is removed here, just in case a failed rendezvous.
let _ = tokio::fs::remove_file(socket_path).await;

result
}

/// Returns a path under the given `dir`. The path name will start with the given prefix.
Expand Down Expand Up @@ -169,7 +197,6 @@ pub async fn tmppath_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> {
}

/// The same as [`tmppath_in`], but uses [`std::env::temp_dir`] as the directory.
#[cfg(test)]
pub async fn tmppath(prefix: &str) -> io::Result<PathBuf> {
let temp_dir = PathBuf::from(std::env::temp_dir());
tmppath_in(prefix, &temp_dir).await
Expand Down Expand Up @@ -234,6 +261,7 @@ impl WorkerHandle {
fn spawn(
program: impl AsRef<Path>,
extra_args: &[String],
socket_path: impl AsRef<Path>,
worker_dir_path: impl AsRef<Path>,
security_status: SecurityStatus,
) -> io::Result<Self> {
Expand All @@ -257,6 +285,8 @@ impl WorkerHandle {
}
let mut child = command
.args(extra_args)
.arg("--socket-path")
.arg(socket_path.as_ref().as_os_str())
.arg("--worker-dir-path")
.arg(worker_dir_path.as_ref().as_os_str())
.args(&security_args)
Expand Down