diff --git a/Cargo.lock b/Cargo.lock index 0fed8d1e54ad..ecd723943709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6329,7 +6329,6 @@ dependencies = [ "futures 0.3.18", "futures-timer 3.0.2", "hex-literal", - "libc", "parity-scale-codec", "pin-project 1.0.8", "polkadot-core-primitives", diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index a09532f54073..a73691156b89 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -15,7 +15,6 @@ async-process = "1.3.0" assert_matches = "1.4.0" futures = "0.3.17" futures-timer = "3.0.2" -libc = "0.2.109" slotmap = "1.0" tracing = "0.1.29" pin-project = "1.0.8" diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index ae0f3d103b6a..e62bedbcd6aa 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -486,12 +486,6 @@ async fn handle_execute_pvf( .await?; }, ArtifactState::Preparing { waiting_for_response: _ } => { - send_prepare( - prepare_queue, - prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() }, - ) - .await?; - awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); }, ArtifactState::FailedToProcess(error) => { @@ -525,18 +519,17 @@ async fn handle_heads_up( *last_time_needed = now; }, ArtifactState::Preparing { waiting_for_response: _ } => { - // Already preparing. We don't need to send a priority amend either because - // it can't get any lower than the background. + // The artifact is already being prepared, so we don't need to do anything. }, ArtifactState::FailedToProcess(_) => {}, } } else { - // The artifact is unknown: register it and put a background job into the prepare queue. + // It's not in the artifacts, so we need to enqueue a job to prepare it. artifacts.insert_preparing(artifact_id.clone(), Vec::new()); send_prepare( prepare_queue, - prepare::ToQueue::Enqueue { priority: Priority::Background, pvf: active_pvf }, + prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf }, ) .await?; } @@ -923,48 +916,6 @@ mod tests { test.poll_ensure_to_sweeper_is_empty().await; } - #[async_std::test] - async fn amending_priority() { - let mut test = Builder::default().build(); - let mut host = test.host_handle(); - - host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); - - // Run until we receive a prepare request. - let prepare_q_rx = &mut test.to_prepare_queue_rx; - run_until( - &mut test.run, - async { - assert_matches!( - prepare_q_rx.next().await.unwrap(), - prepare::ToQueue::Enqueue { .. } - ); - } - .boxed(), - ) - .await; - - let (result_tx, _result_rx) = oneshot::channel(); - host.execute_pvf( - Pvf::from_discriminator(1), - TEST_EXECUTION_TIMEOUT, - vec![], - Priority::Critical, - result_tx, - ) - .await - .unwrap(); - - run_until( - &mut test.run, - async { - assert_matches!(prepare_q_rx.next().await.unwrap(), prepare::ToQueue::Amend { .. }); - } - .boxed(), - ) - .await; - } - #[async_std::test] async fn execute_pvf_requests() { let mut test = Builder::default().build(); @@ -1007,10 +958,6 @@ mod tests { test.poll_and_recv_to_prepare_queue().await, prepare::ToQueue::Enqueue { .. } ); - assert_matches!( - test.poll_and_recv_to_prepare_queue().await, - prepare::ToQueue::Amend { .. } - ); assert_matches!( test.poll_and_recv_to_prepare_queue().await, prepare::ToQueue::Enqueue { .. } diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 4e31164677ef..a51b79dbcc3c 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -53,10 +53,6 @@ pub enum ToPool { /// this message is processed. Kill(Worker), - /// If the given worker was started with the background priority, then it will be raised up to - /// normal priority. Otherwise, it's no-op. - BumpPriority(Worker), - /// Request the given worker to start working on the given code. /// /// Once the job either succeeded or failed, a [`FromPool::Concluded`] message will be sent back. @@ -65,12 +61,7 @@ pub enum ToPool { /// /// In either case, the worker is considered busy and no further `StartWork` messages should be /// sent until either `Concluded` or `Rip` message is received. - StartWork { - worker: Worker, - code: Arc>, - artifact_path: PathBuf, - background_priority: bool, - }, + StartWork { worker: Worker, code: Arc>, artifact_path: PathBuf }, } /// A message sent from pool to its client. @@ -214,7 +205,7 @@ fn handle_to_pool( metrics.prepare_worker().on_begin_spawn(); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); }, - ToPool::StartWork { worker, code, artifact_path, background_priority } => { + ToPool::StartWork { worker, code, artifact_path } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { let preparation_timer = metrics.time_preparation(); @@ -225,7 +216,6 @@ fn handle_to_pool( code, cache_path.to_owned(), artifact_path, - background_priority, preparation_timer, ) .boxed(), @@ -248,10 +238,6 @@ fn handle_to_pool( // It may be absent if it were previously already removed by `purge_dead`. let _ = attempt_retire(metrics, spawned, worker); }, - ToPool::BumpPriority(worker) => - if let Some(data) = spawned.get(worker) { - worker::bump_priority(&data.handle); - }, } } @@ -277,11 +263,9 @@ async fn start_work_task( code: Arc>, cache_path: PathBuf, artifact_path: PathBuf, - background_priority: bool, _preparation_timer: Option, ) -> PoolEvent { - let outcome = - worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await; + let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await; PoolEvent::StartWork(worker, outcome) } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index f1af292c7538..4e226bd3032e 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -29,11 +29,8 @@ pub enum ToQueue { /// This schedules preparation of the given PVF. /// /// Note that it is incorrect to enqueue the same PVF again without first receiving the - /// [`FromQueue`] response. In case there is a need to bump the priority, use - /// [`ToQueue::Amend`]. + /// [`FromQueue`] response. Enqueue { priority: Priority, pvf: Pvf }, - /// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop. - Amend { priority: Priority, artifact_id: ArtifactId }, } /// A response from queue. @@ -97,7 +94,6 @@ impl WorkerData { /// there is going to be a limited number of critical jobs and we don't really care if background starve. #[derive(Default)] struct Unscheduled { - background: VecDeque, normal: VecDeque, critical: VecDeque, } @@ -105,7 +101,6 @@ struct Unscheduled { impl Unscheduled { fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque { match prio { - Priority::Background => &mut self.background, Priority::Normal => &mut self.normal, Priority::Critical => &mut self.critical, } @@ -120,14 +115,12 @@ impl Unscheduled { } fn is_empty(&self) -> bool { - self.background.is_empty() && self.normal.is_empty() && self.critical.is_empty() + self.normal.is_empty() && self.critical.is_empty() } fn next(&mut self) -> Option { let mut check = |prio: Priority| self.queue_mut(prio).pop_front(); - check(Priority::Critical) - .or_else(|| check(Priority::Normal)) - .or_else(|| check(Priority::Background)) + check(Priority::Critical).or_else(|| check(Priority::Normal)) } } @@ -213,9 +206,6 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat ToQueue::Enqueue { priority, pvf } => { handle_enqueue(queue, priority, pvf).await?; }, - ToQueue::Amend { priority, artifact_id } => { - handle_amend(queue, priority, artifact_id).await?; - }, } Ok(()) } @@ -265,41 +255,6 @@ fn find_idle_worker(queue: &mut Queue) -> Option { queue.workers.iter().filter(|(_, data)| data.is_idle()).map(|(k, _)| k).next() } -async fn handle_amend( - queue: &mut Queue, - priority: Priority, - artifact_id: ArtifactId, -) -> Result<(), Fatal> { - if let Some(&job) = queue.artifact_id_to_job.get(&artifact_id) { - tracing::debug!( - target: LOG_TARGET, - validation_code_hash = ?artifact_id.code_hash, - ?priority, - "amending preparation priority.", - ); - - let mut job_data: &mut JobData = &mut queue.jobs[job]; - if job_data.priority < priority { - // The new priority is higher. We should do two things: - // - if the worker was already spawned with the background prio and the new one is not - // (it's already the case, if we are in this branch but we still do the check for - // clarity), then we should tell the pool to bump the priority for the worker. - // - // - save the new priority in the job. - - if let Some(worker) = job_data.worker { - if job_data.priority.is_background() && !priority.is_background() { - send_pool(&mut queue.to_pool_tx, pool::ToPool::BumpPriority(worker)).await?; - } - } - - job_data.priority = priority; - } - } - - Ok(()) -} - async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> { use pool::FromPool::*; match from_pool { @@ -469,12 +424,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal send_pool( &mut queue.to_pool_tx, - pool::ToPool::StartWork { - worker, - code: job_data.pvf.code.clone(), - artifact_path, - background_priority: job_data.priority.is_background(), - }, + pool::ToPool::StartWork { worker, code: job_data.pvf.code.clone(), artifact_path }, ) .await?; @@ -644,7 +594,7 @@ mod tests { async fn properly_concludes() { let mut test = Test::new(2, 2); - test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) }); + test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); let w = test.workers.insert(()); @@ -713,26 +663,6 @@ mod tests { assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } - #[async_std::test] - async fn bump_prio_on_urgency_change() { - let mut test = Test::new(2, 2); - - test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) }); - - assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); - - let w = test.workers.insert(()); - test.send_from_pool(pool::FromPool::Spawned(w)); - - assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); - test.send_queue(ToQueue::Amend { - priority: Priority::Normal, - artifact_id: pvf(1).as_artifact_id(), - }); - - assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::BumpPriority(w)); - } - #[async_std::test] async fn worker_mass_die_out_doesnt_stall_queue() { let mut test = Test::new(2, 2); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 57409d9e05b3..1913f79d8222 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -32,9 +32,6 @@ use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{any::Any, panic, sync::Arc, time::Duration}; -const NICENESS_BACKGROUND: i32 = 10; -const NICENESS_FOREGROUND: i32 = 0; - /// The time period after which the preparation worker is considered unresponsive and will be killed. // NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. const COMPILATION_TIMEOUT: Duration = Duration::from_secs(60); @@ -72,22 +69,16 @@ pub async fn start_work( code: Arc>, cache_path: &Path, artifact_path: PathBuf, - background_priority: bool, ) -> Outcome { let IdleWorker { mut stream, pid } = worker; tracing::debug!( target: LOG_TARGET, worker_pid = %pid, - %background_priority, "starting prepare for {}", artifact_path.display(), ); - if background_priority { - renice(pid, NICENESS_BACKGROUND); - } - with_tmp_file(pid, cache_path, |tmp_file| async move { if let Err(err) = send_request(&mut stream, code, &tmp_file).await { tracing::warn!( @@ -172,10 +163,8 @@ pub async fn start_work( }; match selected { - Selected::Done(result) => { - renice(pid, NICENESS_FOREGROUND); - Outcome::Concluded { worker: IdleWorker { stream, pid }, result } - }, + Selected::Done(result) => + Outcome::Concluded { worker: IdleWorker { stream, pid }, result }, Selected::Deadline => Outcome::TimedOut, Selected::IoErr => Outcome::DidNotMakeIt, } @@ -250,28 +239,6 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf)> Ok((code, tmp_file)) } -pub fn bump_priority(handle: &WorkerHandle) { - let pid = handle.id(); - renice(pid, NICENESS_FOREGROUND); -} - -fn renice(pid: u32, niceness: i32) { - tracing::debug!( - target: LOG_TARGET, - worker_pid = %pid, - "changing niceness to {}", - niceness, - ); - - // Consider upstreaming this to the `nix` crate. - unsafe { - if -1 == libc::setpriority(libc::PRIO_PROCESS, pid, niceness) { - let err = std::io::Error::last_os_error(); - tracing::warn!(target: LOG_TARGET, "failed to set the priority: {:?}", err); - } - } -} - /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { diff --git a/node/core/pvf/src/priority.rs b/node/core/pvf/src/priority.rs index 8ba7b3907257..de169be0696b 100644 --- a/node/core/pvf/src/priority.rs +++ b/node/core/pvf/src/priority.rs @@ -17,11 +17,6 @@ /// A priority assigned to execution of a PVF. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum Priority { - /// Jobs in this priority will be executed in the background, meaning that they will be only - /// given spare CPU time. - /// - /// This is mainly for cache warmings. - Background, /// Normal priority for things that do not require immediate response, but still need to be /// done pretty quick. /// @@ -38,9 +33,4 @@ impl Priority { pub fn is_critical(self) -> bool { self == Priority::Critical } - - /// Returns `true` if `self` is `Background` - pub fn is_background(self) -> bool { - self == Priority::Background - } }