Skip to content

Commit

Permalink
Remove tokio from workers
Browse files Browse the repository at this point in the history
Starting the tokio runtime was calling `socketpair` and triggering the new
seccomp filter. Removed tokio since we wanted to do it soon anyway as part of
#649.
  • Loading branch information
mrcnski committed Oct 24, 2023
1 parent 5e100f6 commit 2bae7a6
Show file tree
Hide file tree
Showing 13 changed files with 33 additions and 42 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion polkadot/node/core/pvf/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ cpu-time = "1.0.0"
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.139"
tokio = { version = "1.24.2", features = ["fs", "process", "io-util"] }

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }

Expand Down
3 changes: 1 addition & 2 deletions polkadot/node/core/pvf/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ pub use sp_tracing;
const LOG_TARGET: &str = "parachain::pvf-common";

use std::{
io::{Read, Write},
io::{self, Read, Write},
mem,
};
use tokio::io;

#[cfg(feature = "test-utils")]
pub mod tests {
Expand Down
25 changes: 8 additions & 17 deletions polkadot/node/core/pvf/common/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ use cpu_time::ProcessTime;
use futures::never::Never;
use std::{
any::Any,
fmt,
fmt, io,
os::unix::net::UnixStream,
path::PathBuf,
sync::mpsc::{Receiver, RecvTimeoutError},
time::Duration,
};
use tokio::{io, runtime::Runtime};

/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for
/// spawning the desired worker.
Expand Down Expand Up @@ -198,7 +197,7 @@ impl fmt::Display for WorkerKind {

// The worker version must be passed in so that we accurately get the version of the worker, and not
// the version that this crate was compiled with.
pub fn worker_event_loop<F, Fut>(
pub fn worker_event_loop<F>(
worker_kind: WorkerKind,
socket_path: PathBuf,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
Expand All @@ -207,8 +206,7 @@ pub fn worker_event_loop<F, Fut>(
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))] security_status: &SecurityStatus,
mut event_loop: F,
) where
F: FnMut(UnixStream, PathBuf) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
F: FnMut(UnixStream, PathBuf) -> io::Result<Never>,
{
let worker_pid = std::process::id();
gum::debug!(
Expand Down Expand Up @@ -262,7 +260,7 @@ pub fn worker_event_loop<F, Fut>(
}

// Connect to the socket.
let stream = || -> std::io::Result<UnixStream> {
let stream = || -> io::Result<UnixStream> {
let stream = UnixStream::connect(&socket_path)?;
let _ = std::fs::remove_file(&socket_path);
Ok(stream)
Expand Down Expand Up @@ -362,18 +360,11 @@ pub fn worker_event_loop<F, Fut>(
}

// Run the main worker loop.
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let err = rt
.block_on(event_loop(stream, worker_dir_path))
let err = event_loop(stream, worker_dir_path)
// It's never `Ok` because it's `Ok(Never)`.
.unwrap_err();

worker_shutdown_message(worker_kind, worker_pid, &err.to_string());

// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
// but may be in the future.
rt.shutdown_background();
}

/// Provide a consistent message on worker shutdown.
Expand Down Expand Up @@ -454,7 +445,7 @@ fn kill_parent_node_in_emergency() {
/// The motivation for this module is to coordinate worker threads without using async Rust.
pub mod thread {
use std::{
panic,
io, panic,
sync::{Arc, Condvar, Mutex},
thread,
time::Duration,
Expand Down Expand Up @@ -495,7 +486,7 @@ pub mod thread {
f: F,
cond: Cond,
outcome: WaitOutcome,
) -> std::io::Result<thread::JoinHandle<R>>
) -> io::Result<thread::JoinHandle<R>>
where
F: FnOnce() -> R,
F: Send + 'static + panic::UnwindSafe,
Expand All @@ -513,7 +504,7 @@ pub mod thread {
cond: Cond,
outcome: WaitOutcome,
stack_size: usize,
) -> std::io::Result<thread::JoinHandle<R>>
) -> io::Result<thread::JoinHandle<R>>
where
F: FnOnce() -> R,
F: Send + 'static + panic::UnwindSafe,
Expand Down
6 changes: 2 additions & 4 deletions polkadot/node/core/pvf/common/src/worker/security/landlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ use crate::{
LOG_TARGET,
};
use landlock::*;
use std::{
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};

/// Landlock ABI version. We use ABI V1 because:
///
Expand Down Expand Up @@ -103,7 +101,7 @@ pub fn enable_for_worker(
panic!("this should only be passed for checking pivot_root; qed"),
};

gum::debug!(
gum::trace!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
Expand Down
9 changes: 8 additions & 1 deletion polkadot/node/core/pvf/common/src/worker/security/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub fn unshare_user_namespace_and_change_root(
}}
}

gum::debug!(
gum::trace!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
Expand Down Expand Up @@ -175,6 +175,13 @@ pub fn unshare_user_namespace_and_change_root(
/// Require env vars to have been removed when spawning the process, to prevent malicious code from
/// accessing them.
pub fn check_env_vars_were_cleared(worker_kind: WorkerKind, worker_pid: u32) -> bool {
gum::trace!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
"clearing env vars in worker",
);

let mut ok = true;

for (key, value) in std::env::vars_os() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub fn enable_for_worker(
worker_pid: u32,
worker_dir_path: &Path,
) -> Result<()> {
gum::debug!(
gum::trace!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
Expand Down
1 change: 0 additions & 1 deletion polkadot/node/core/pvf/execute-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ cpu-time = "1.0.0"
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../../gum" }
rayon = "1.5.1"
tokio = { version = "1.24.2", features = ["fs", "process"] }

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }

Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ use polkadot_node_core_pvf_common::{
use polkadot_parachain_primitives::primitives::ValidationResult;
use polkadot_primitives::{executor_params::DEFAULT_NATIVE_STACK_MAX, ExecutorParams};
use std::{
io,
os::unix::net::UnixStream,
path::PathBuf,
sync::{mpsc::channel, Arc},
time::Duration,
};
use tokio::io;

// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
Expand Down Expand Up @@ -138,7 +138,7 @@ pub fn worker_entrypoint(
node_version,
worker_version,
&security_status,
|mut stream, worker_dir_path| async move {
|mut stream, worker_dir_path| {
let worker_pid = std::process::id();
let artifact_path = worker_dir::execute_artifact(&worker_dir_path);

Expand Down
1 change: 0 additions & 1 deletion polkadot/node/core/pvf/prepare-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.139"
rayon = "1.5.1"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tokio = { version = "1.24.2", features = ["fs", "process"] }

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }

Expand Down
9 changes: 4 additions & 5 deletions polkadot/node/core/pvf/prepare-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ use polkadot_node_core_pvf_common::{
};
use polkadot_primitives::ExecutorParams;
use std::{
fs, io,
os::unix::net::UnixStream,
path::PathBuf,
sync::{mpsc::channel, Arc},
time::Duration,
};
use tokio::io;

/// Contains the bytes for a successfully compiled artifact.
pub struct CompiledArtifact(Vec<u8>);
Expand Down Expand Up @@ -131,7 +131,7 @@ pub fn worker_entrypoint(
node_version,
worker_version,
&security_status,
|mut stream, worker_dir_path| async move {
|mut stream, worker_dir_path| {
let worker_pid = std::process::id();
let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path);

Expand Down Expand Up @@ -229,8 +229,7 @@ pub fn worker_entrypoint(

// Stop the memory stats worker and get its observed memory stats.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid)
.await;
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid);
let memory_stats = MemoryStats {
#[cfg(any(
target_os = "linux",
Expand All @@ -255,7 +254,7 @@ pub fn worker_entrypoint(
"worker: writing artifact to {}",
temp_artifact_dest.display(),
);
tokio::fs::write(&temp_artifact_dest, &artifact).await?;
fs::write(&temp_artifact_dest, &artifact)?;

Ok(PrepareStats { cpu_time_elapsed, memory_stats })
},
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/pvf/prepare-worker/src/memory_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub mod memory_tracker {
}

/// Helper function to get the stats from the memory tracker. Helps isolate this error handling.
pub async fn get_memory_tracker_loop_stats(
pub fn get_memory_tracker_loop_stats(
thread: JoinHandle<Result<MemoryAllocationStats, String>>,
worker_pid: u32,
) -> Option<MemoryAllocationStats> {
Expand Down
9 changes: 6 additions & 3 deletions polkadot/node/core/pvf/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ async fn ensure_parallel_execution() {
async fn execute_queue_doesnt_stall_if_workers_died() {
let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 5;
}).await;
})
.await;

// Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The
// first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of
Expand Down Expand Up @@ -245,7 +246,8 @@ async fn execute_queue_doesnt_stall_if_workers_died() {
async fn execute_queue_doesnt_stall_with_varying_executor_params() {
let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 2;
}).await;
})
.await;

let executor_params_1 = ExecutorParams::default();
let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]);
Expand Down Expand Up @@ -354,7 +356,8 @@ async fn deleting_prepared_artifact_does_not_dispute() {
async fn prepare_can_run_serially() {
let host = TestHost::new_with_config(|cfg| {
cfg.prepare_workers_hard_max_num = 1;
}).await;
})
.await;

let _stats = host
.precheck_pvf(::adder::wasm_binary_unwrap(), Default::default())
Expand Down

0 comments on commit 2bae7a6

Please sign in to comment.