Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/paritytech/polkadot into …
Browse files Browse the repository at this point in the history
…pep-pvf-paras
  • Loading branch information
Parity Bot committed Dec 14, 2021
2 parents a540b68 + 69b4791 commit 0090e71
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 197 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ async-process = "1.3.0"
assert_matches = "1.4.0"
futures = "0.3.17"
futures-timer = "3.0.2"
libc = "0.2.109"
slotmap = "1.0"
tracing = "0.1.29"
pin-project = "1.0.8"
Expand Down
59 changes: 3 additions & 56 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,6 @@ async fn handle_execute_pvf(
.await?;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
send_prepare(
prepare_queue,
prepare::ToQueue::Amend { priority, artifact_id: artifact_id.clone() },
)
.await?;

awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
},
ArtifactState::FailedToProcess(error) => {
Expand Down Expand Up @@ -525,18 +519,17 @@ async fn handle_heads_up(
*last_time_needed = now;
},
ArtifactState::Preparing { waiting_for_response: _ } => {
// Already preparing. We don't need to send a priority amend either because
// it can't get any lower than the background.
// The artifact is already being prepared, so we don't need to do anything.
},
ArtifactState::FailedToProcess(_) => {},
}
} else {
// The artifact is unknown: register it and put a background job into the prepare queue.
// It's not in the artifacts, so we need to enqueue a job to prepare it.
artifacts.insert_preparing(artifact_id.clone(), Vec::new());

send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue { priority: Priority::Background, pvf: active_pvf },
prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
)
.await?;
}
Expand Down Expand Up @@ -923,48 +916,6 @@ mod tests {
test.poll_ensure_to_sweeper_is_empty().await;
}

#[async_std::test]
async fn amending_priority() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();

// Run until we receive a prepare request.
let prepare_q_rx = &mut test.to_prepare_queue_rx;
run_until(
&mut test.run,
async {
assert_matches!(
prepare_q_rx.next().await.unwrap(),
prepare::ToQueue::Enqueue { .. }
);
}
.boxed(),
)
.await;

let (result_tx, _result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
vec![],
Priority::Critical,
result_tx,
)
.await
.unwrap();

run_until(
&mut test.run,
async {
assert_matches!(prepare_q_rx.next().await.unwrap(), prepare::ToQueue::Amend { .. });
}
.boxed(),
)
.await;
}

#[async_std::test]
async fn execute_pvf_requests() {
let mut test = Builder::default().build();
Expand Down Expand Up @@ -1007,10 +958,6 @@ mod tests {
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Amend { .. }
);
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
Expand Down
22 changes: 3 additions & 19 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ pub enum ToPool {
/// this message is processed.
Kill(Worker),

/// If the given worker was started with the background priority, then it will be raised up to
/// normal priority. Otherwise, it's no-op.
BumpPriority(Worker),

/// Request the given worker to start working on the given code.
///
/// Once the job either succeeded or failed, a [`FromPool::Concluded`] message will be sent back.
Expand All @@ -65,12 +61,7 @@ pub enum ToPool {
///
/// In either case, the worker is considered busy and no further `StartWork` messages should be
/// sent until either `Concluded` or `Rip` message is received.
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
background_priority: bool,
},
StartWork { worker: Worker, code: Arc<Vec<u8>>, artifact_path: PathBuf },
}

/// A message sent from pool to its client.
Expand Down Expand Up @@ -214,7 +205,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, background_priority } => {
ToPool::StartWork { worker, code, artifact_path } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
Expand All @@ -225,7 +216,6 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
background_priority,
preparation_timer,
)
.boxed(),
Expand All @@ -248,10 +238,6 @@ fn handle_to_pool(
// It may be absent if it were previously already removed by `purge_dead`.
let _ = attempt_retire(metrics, spawned, worker);
},
ToPool::BumpPriority(worker) =>
if let Some(data) = spawned.get(worker) {
worker::bump_priority(&data.handle);
},
}
}

Expand All @@ -277,11 +263,9 @@ async fn start_work_task<Timer>(
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
background_priority: bool,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await;
let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await;
PoolEvent::StartWork(worker, outcome)
}

Expand Down
80 changes: 5 additions & 75 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ pub enum ToQueue {
/// This schedules preparation of the given PVF.
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response. In case there is a need to bump the priority, use
/// [`ToQueue::Amend`].
/// [`FromQueue`] response.
Enqueue { priority: Priority, pvf: Pvf },
/// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop.
Amend { priority: Priority, artifact_id: ArtifactId },
}

/// A response from queue.
Expand Down Expand Up @@ -97,15 +94,13 @@ impl WorkerData {
/// there is going to be a limited number of critical jobs and we don't really care if background starve.
#[derive(Default)]
struct Unscheduled {
background: VecDeque<Job>,
normal: VecDeque<Job>,
critical: VecDeque<Job>,
}

impl Unscheduled {
fn queue_mut(&mut self, prio: Priority) -> &mut VecDeque<Job> {
match prio {
Priority::Background => &mut self.background,
Priority::Normal => &mut self.normal,
Priority::Critical => &mut self.critical,
}
Expand All @@ -120,14 +115,12 @@ impl Unscheduled {
}

fn is_empty(&self) -> bool {
self.background.is_empty() && self.normal.is_empty() && self.critical.is_empty()
self.normal.is_empty() && self.critical.is_empty()
}

fn next(&mut self) -> Option<Job> {
let mut check = |prio: Priority| self.queue_mut(prio).pop_front();
check(Priority::Critical)
.or_else(|| check(Priority::Normal))
.or_else(|| check(Priority::Background))
check(Priority::Critical).or_else(|| check(Priority::Normal))
}
}

Expand Down Expand Up @@ -213,9 +206,6 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat
ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf).await?;
},
ToQueue::Amend { priority, artifact_id } => {
handle_amend(queue, priority, artifact_id).await?;
},
}
Ok(())
}
Expand Down Expand Up @@ -265,41 +255,6 @@ fn find_idle_worker(queue: &mut Queue) -> Option<Worker> {
queue.workers.iter().filter(|(_, data)| data.is_idle()).map(|(k, _)| k).next()
}

async fn handle_amend(
queue: &mut Queue,
priority: Priority,
artifact_id: ArtifactId,
) -> Result<(), Fatal> {
if let Some(&job) = queue.artifact_id_to_job.get(&artifact_id) {
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
?priority,
"amending preparation priority.",
);

let mut job_data: &mut JobData = &mut queue.jobs[job];
if job_data.priority < priority {
// The new priority is higher. We should do two things:
// - if the worker was already spawned with the background prio and the new one is not
// (it's already the case, if we are in this branch but we still do the check for
// clarity), then we should tell the pool to bump the priority for the worker.
//
// - save the new priority in the job.

if let Some(worker) = job_data.worker {
if job_data.priority.is_background() && !priority.is_background() {
send_pool(&mut queue.to_pool_tx, pool::ToPool::BumpPriority(worker)).await?;
}
}

job_data.priority = priority;
}
}

Ok(())
}

async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> {
use pool::FromPool::*;
match from_pool {
Expand Down Expand Up @@ -469,12 +424,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal

send_pool(
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
code: job_data.pvf.code.clone(),
artifact_path,
background_priority: job_data.priority.is_background(),
},
pool::ToPool::StartWork { worker, code: job_data.pvf.code.clone(), artifact_path },
)
.await?;

Expand Down Expand Up @@ -644,7 +594,7 @@ mod tests {
async fn properly_concludes() {
let mut test = Test::new(2, 2);

test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

let w = test.workers.insert(());
Expand Down Expand Up @@ -713,26 +663,6 @@ mod tests {
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}

#[async_std::test]
async fn bump_prio_on_urgency_change() {
let mut test = Test::new(2, 2);

test.send_queue(ToQueue::Enqueue { priority: Priority::Background, pvf: pvf(1) });

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));

assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_queue(ToQueue::Amend {
priority: Priority::Normal,
artifact_id: pvf(1).as_artifact_id(),
});

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::BumpPriority(w));
}

#[async_std::test]
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);
Expand Down
37 changes: 2 additions & 35 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{any::Any, panic, sync::Arc, time::Duration};

const NICENESS_BACKGROUND: i32 = 10;
const NICENESS_FOREGROUND: i32 = 0;

/// The time period after which the preparation worker is considered unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
const COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -72,22 +69,16 @@ pub async fn start_work(
code: Arc<Vec<u8>>,
cache_path: &Path,
artifact_path: PathBuf,
background_priority: bool,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;

tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
%background_priority,
"starting prepare for {}",
artifact_path.display(),
);

if background_priority {
renice(pid, NICENESS_BACKGROUND);
}

with_tmp_file(pid, cache_path, |tmp_file| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file).await {
tracing::warn!(
Expand Down Expand Up @@ -172,10 +163,8 @@ pub async fn start_work(
};

match selected {
Selected::Done(result) => {
renice(pid, NICENESS_FOREGROUND);
Outcome::Concluded { worker: IdleWorker { stream, pid }, result }
},
Selected::Done(result) =>
Outcome::Concluded { worker: IdleWorker { stream, pid }, result },
Selected::Deadline => Outcome::TimedOut,
Selected::IoErr => Outcome::DidNotMakeIt,
}
Expand Down Expand Up @@ -250,28 +239,6 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)>
Ok((code, tmp_file))
}

pub fn bump_priority(handle: &WorkerHandle) {
let pid = handle.id();
renice(pid, NICENESS_FOREGROUND);
}

fn renice(pid: u32, niceness: i32) {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"changing niceness to {}",
niceness,
);

// Consider upstreaming this to the `nix` crate.
unsafe {
if -1 == libc::setpriority(libc::PRIO_PROCESS, pid, niceness) {
let err = std::io::Error::last_os_error();
tracing::warn!(target: LOG_TARGET, "failed to set the priority: {:?}", err);
}
}
}

/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
Expand Down
Loading

0 comments on commit 0090e71

Please sign in to comment.