Skip to content

Commit

Permalink
Check for @rule graph cycles asynchronously (#13370)
Browse files Browse the repository at this point in the history
In pathological cases of very small `@rule` bodies or extreme graph shapes, cycle detection in the `Graph` can be very expensive. This cost can be challenging to detect in production, but it does occasionally show up as hot, particularly while cleaning graphs (since that skips all `@rule` logic). That's precisely when we do not want to be spending any extraneous time on formalities.

This change moves to asynchronously checking for cycles in Running nodes in the `Graph`. Checking in the background has two advantages: 1. cycle detection is batched across the entire graph, rather than per-edge-addition, and 2. in most cases, successful Nodes never need to be checked while running.

We also add two pathological graph shape benchmarks, which show significant improvement:
* `main`:
    ```
    Benchmark #1: ./pants test --force --no-pytest-timeouts src/python/pants/engine/internals/engine_benchmarks_test.py
      Time (mean ± σ):     247.157 s ±  9.095 s    [User: 872.4 ms, System: 305.2 ms]
      Range (min … max):   240.523 s … 266.031 s    10 runs
    ```
* this branch:
    ```
    Benchmark #1: ./pants test --force --no-pytest-timeouts src/python/pants/engine/internals/engine_benchmarks_test.py
      Time (mean ± σ):     34.779 s ±  4.625 s    [User: 889.1 ms, System: 303.5 ms]
      Range (min … max):   32.246 s … 47.491 s    10 runs
    ```
  • Loading branch information
stuhood authored Oct 28, 2021
1 parent 9a4fd98 commit 13827bb
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 387 deletions.
47 changes: 47 additions & 0 deletions src/python/pants/engine/internals/engine_benchmarks_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from dataclasses import dataclass
from random import randrange

from pants.engine.rules import Get, MultiGet, rule
from pants.testutil.rule_runner import QueryRule, RuleRunner


@dataclass(frozen=True)
class Deep:
val: int


@rule
async def deep(n: int) -> Deep:
if n < 2:
return Deep(n)
x, y = tuple(await MultiGet([Get(Deep, int(n - 2)), Get(Deep, int(n - 1))]))
return Deep(x.val + y.val)


@dataclass(frozen=True)
class Wide:
val: int


@rule
async def wide(index: int) -> Wide:
if index > 0:
_ = await MultiGet([Get(Wide, int(randrange(index))) for _ in range(100)])
return Wide(index)


def test_bench_deep():
rule_runner = RuleRunner(rules=[deep, QueryRule(Deep, (int,))])
for _ in range(0, 10):
rule_runner.scheduler.scheduler.invalidate_all()
_ = rule_runner.request(Deep, [10000])


def test_bench_wide():
rule_runner = RuleRunner(rules=[wide, QueryRule(Wide, (int,))])
for _ in range(0, 5):
rule_runner.scheduler.scheduler.invalidate_all()
_ = rule_runner.request(Wide, [1000])
2 changes: 2 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 25 additions & 8 deletions src/rust/engine/async_value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ use tokio::sync::{oneshot, watch};
#[derive(Debug)]
pub struct AsyncValue<T: Clone + Send + Sync + 'static> {
item_receiver: Weak<watch::Receiver<Option<T>>>,
// NB: Stored only for drop.
#[allow(dead_code)]
abort_sender: oneshot::Sender<()>,
abort_sender: Option<oneshot::Sender<T>>,
}

impl<T: Clone + Send + Sync + 'static> AsyncValue<T> {
Expand All @@ -55,7 +53,7 @@ impl<T: Clone + Send + Sync + 'static> AsyncValue<T> {
(
AsyncValue {
item_receiver: Arc::downgrade(&item_receiver),
abort_sender,
abort_sender: Some(abort_sender),
},
AsyncValueSender {
item_sender,
Expand All @@ -75,6 +73,14 @@ impl<T: Clone + Send + Sync + 'static> AsyncValue<T> {
.upgrade()
.map(|item_receiver| AsyncValueReceiver { item_receiver })
}

pub fn try_abort(&mut self, t: T) -> Result<(), T> {
if let Some(abort_sender) = self.abort_sender.take() {
abort_sender.send(t)
} else {
Ok(())
}
}
}

pub struct AsyncValueReceiver<T: Clone + Send + Sync + 'static> {
Expand Down Expand Up @@ -102,18 +108,29 @@ impl<T: Clone + Send + Sync + 'static> AsyncValueReceiver<T> {

pub struct AsyncValueSender<T: Clone + Send + Sync + 'static> {
item_sender: watch::Sender<Option<T>>,
abort_receiver: oneshot::Receiver<()>,
abort_receiver: oneshot::Receiver<T>,
}

impl<T: Clone + Send + Sync + 'static> AsyncValueSender<T> {
pub fn send(self, item: T) {
let _ = self.item_sender.send(Some(item));
}

pub async fn closed(&mut self) {
pub async fn aborted(&mut self) -> Option<T> {
tokio::select! {
_ = &mut self.abort_receiver => {}
_ = self.item_sender.closed() => {}
res = &mut self.abort_receiver => {
match res {
Ok(res) => {
// Aborted with a value.
Some(res)
},
Err(_) => {
// Was dropped.
None
},
}
}
_ = self.item_sender.closed() => { None }
}
}
}
Expand Down
25 changes: 22 additions & 3 deletions src/rust/engine/async_value/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn cancel_explicit() {
let (value, mut sender, receiver) = AsyncValue::<()>::new();

// A task that will never do any meaningful work, and just wait to be canceled.
let _send_task = tokio::spawn(async move { sender.closed().await });
let _send_task = tokio::spawn(async move { sender.aborted().await });

// Ensure that a value is not received.
tokio::select! {
Expand All @@ -35,7 +35,7 @@ async fn cancel_implicit() {
let (value, mut sender, receiver) = AsyncValue::<()>::new();

// A task that will never do any meaningful work, and just wait to be canceled.
let send_task = tokio::spawn(async move { sender.closed().await });
let send_task = tokio::spawn(async move { sender.aborted().await });

// Ensure that a value is not received.
tokio::select! {
Expand All @@ -46,6 +46,25 @@ async fn cancel_implicit() {
// Then drop the only receiver and confirm that the background task returns, and that new
// receivers cannot be created.
std::mem::drop(receiver);
send_task.await.unwrap();
assert_eq!(None, send_task.await.unwrap());
assert!(value.receiver().is_none());
}

#[tokio::test]
async fn abort_explicit() {
let (mut value, mut sender, receiver) = AsyncValue::<()>::new();

// A task that will never do any meaningful work, and just wait to be canceled.
let send_task = tokio::spawn(async move { sender.aborted().await });

// Ensure that a value is not received.
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {},
_ = receiver.recv() => { panic!("Should have continued to wait.") }
}

// Explicitly abort the task, and confirm that it exits and cancels the work.
value.try_abort(()).unwrap();
assert_eq!(Some(()), send_task.await.unwrap());
assert_eq!(None, receiver.recv().await);
}
3 changes: 2 additions & 1 deletion src/rust/engine/graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ fixedbitset = "0.2"
log = "0.4"
parking_lot = "0.11"
petgraph = "0.5"
tokio = { version = "1.4", features = ["time"] }
task_executor = { path = "../task_executor" }
tokio = { version = "1.4", features = ["time", "parking_lot"] }

[dev-dependencies]
rand = "0.8"
Expand Down
67 changes: 54 additions & 13 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub enum EntryState<N: Node> {
pending_value: AsyncValue<NodeResult<N>>,
generation: Generation,
previous_result: Option<EntryResult<N>>,
is_cleaning: bool,
},
// A node that has completed, and then possibly been marked dirty. Because marking a node
// dirty does not eagerly re-execute any logic, it will stay this way until a caller moves it
Expand Down Expand Up @@ -309,7 +310,8 @@ impl<N: Node> Entry<N> {
let context = context_factory.clone_for(entry_id);
let context2 = context.clone();
let node = node.clone();
let (value, mut sender, receiver) = AsyncValue::new();
let (value, mut sender, receiver) = AsyncValue::<NodeResult<N>>::new();
let is_cleaning = previous_dep_generations.is_some();

let run_or_clean = async move {
// If we have previous result generations, compare them to all current dependency
Expand All @@ -323,7 +325,7 @@ impl<N: Node> Entry<N> {
{
// If dependency generations mismatched or failed to fetch, clear the node's dependencies
// and indicate that it should re-run.
context.graph().clear_deps(entry_id, run_token);
context.graph().cleaning_failed(entry_id, run_token);
context.stats().cleaning_failed += 1;
false
} else {
Expand All @@ -349,20 +351,27 @@ impl<N: Node> Entry<N> {
};

context_factory.spawn(async move {
tokio::select! {
_ = sender.closed() => {
// We've been explicitly canceled: exit.
context2
.graph()
.cancel(entry_id, run_token);
let maybe_res = tokio::select! {
abort_item = sender.aborted() => {
if let Some(res) = abort_item {
// We were aborted via terminate: complete with the given res.
Some(res.map(|v| v.0))
} else {
// We were aborted via drop: exit.
context2
.graph()
.cancel(entry_id, run_token);
return;
}
}
maybe_res = run_or_clean => {
// The node completed.
context2
.graph()
.complete(&context2, entry_id, run_token, sender, maybe_res);
maybe_res
}
}
};
// The node completed.
context2
.graph()
.complete(&context2, entry_id, run_token, sender, maybe_res);
});

(
Expand All @@ -371,6 +380,7 @@ impl<N: Node> Entry<N> {
pending_value: value,
generation,
previous_result,
is_cleaning,
},
receiver,
)
Expand Down Expand Up @@ -754,6 +764,30 @@ impl<N: Node> Entry<N> {
}
}

///
/// Terminates this Node with the given error iff it is Running.
///
/// This method is asynchronous: the task running the Node will take some time to notice that it
/// has been terminated, and to update the state of the Node.
///
pub(crate) fn terminate(&mut self, err: N::Error) {
let state = &mut *self.state.lock();
test_trace_log!("Terminating node {:?} with {:?}", self.node, err);
if let EntryState::Running { pending_value, .. } = state {
let _ = pending_value.try_abort(Err(err));
};
}

///
/// Indicates that cleaning this Node has failed.
///
pub(crate) fn cleaning_failed(&mut self) {
let state = &mut *self.state.lock();
if let EntryState::Running { is_cleaning, .. } = state {
*is_cleaning = false;
};
}

pub fn is_started(&self) -> bool {
match *self.state.lock() {
EntryState::NotStarted { .. } => false,
Expand All @@ -768,6 +802,13 @@ impl<N: Node> Entry<N> {
}
}

pub fn is_cleaning(&self) -> bool {
match *self.state.lock() {
EntryState::Running { is_cleaning, .. } => is_cleaning,
EntryState::Completed { .. } | EntryState::NotStarted { .. } => false,
}
}

pub fn is_clean(&self, context: &N::Context) -> bool {
match *self.state.lock() {
EntryState::NotStarted {
Expand Down
Loading

0 comments on commit 13827bb

Please sign in to comment.