From 13827bb491ab5f4e1acc92bfae3af3ab35e5dc93 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Thu, 28 Oct 2021 16:10:08 -0700 Subject: [PATCH] Check for `@rule` graph cycles asynchronously (#13370) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In pathological cases of very small `@rule` bodies or extreme graph shapes, cycle detection in the `Graph` can be very expensive. This cost can be challenging to detect in production, but it does occasionally show up as hot, particularly while cleaning graphs (since that skips all `@rule` logic). That's precisely when we do not want to be spending any extraneous time on formalities. This change moves to asynchronously checking for cycles in Running nodes in the `Graph`. Checking in the background has two advantages: 1. cycle detection is batched across the entire graph, rather than per-edge-addition, and 2. in most cases, successful Nodes never need to be checked while running. We also add two pathological graph shape benchmarks, which show significant improvement: * `main`: ``` Benchmark #1: ./pants test --force --no-pytest-timeouts src/python/pants/engine/internals/engine_benchmarks_test.py Time (mean ± σ): 247.157 s ± 9.095 s [User: 872.4 ms, System: 305.2 ms] Range (min … max): 240.523 s … 266.031 s 10 runs ``` * this branch: ``` Benchmark #1: ./pants test --force --no-pytest-timeouts src/python/pants/engine/internals/engine_benchmarks_test.py Time (mean ± σ): 34.779 s ± 4.625 s [User: 889.1 ms, System: 303.5 ms] Range (min … max): 32.246 s … 47.491 s 10 runs ``` --- .../internals/engine_benchmarks_test.py | 47 +++ src/rust/engine/Cargo.lock | 2 + src/rust/engine/async_value/src/lib.rs | 33 +- src/rust/engine/async_value/src/tests.rs | 25 +- src/rust/engine/graph/Cargo.toml | 3 +- src/rust/engine/graph/src/entry.rs | 67 +++- src/rust/engine/graph/src/lib.rs | 357 ++++++------------ src/rust/engine/graph/src/tests.rs | 142 ++----- src/rust/engine/src/context.rs | 2 +- 9 files changed, 291 insertions(+), 387 deletions(-) create mode 100644 src/python/pants/engine/internals/engine_benchmarks_test.py diff --git a/src/python/pants/engine/internals/engine_benchmarks_test.py b/src/python/pants/engine/internals/engine_benchmarks_test.py new file mode 100644 index 00000000000..ab861a515d6 --- /dev/null +++ b/src/python/pants/engine/internals/engine_benchmarks_test.py @@ -0,0 +1,47 @@ +# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). +# Licensed under the Apache License, Version 2.0 (see LICENSE). + +from dataclasses import dataclass +from random import randrange + +from pants.engine.rules import Get, MultiGet, rule +from pants.testutil.rule_runner import QueryRule, RuleRunner + + +@dataclass(frozen=True) +class Deep: + val: int + + +@rule +async def deep(n: int) -> Deep: + if n < 2: + return Deep(n) + x, y = tuple(await MultiGet([Get(Deep, int(n - 2)), Get(Deep, int(n - 1))])) + return Deep(x.val + y.val) + + +@dataclass(frozen=True) +class Wide: + val: int + + +@rule +async def wide(index: int) -> Wide: + if index > 0: + _ = await MultiGet([Get(Wide, int(randrange(index))) for _ in range(100)]) + return Wide(index) + + +def test_bench_deep(): + rule_runner = RuleRunner(rules=[deep, QueryRule(Deep, (int,))]) + for _ in range(0, 10): + rule_runner.scheduler.scheduler.invalidate_all() + _ = rule_runner.request(Deep, [10000]) + + +def test_bench_wide(): + rule_runner = RuleRunner(rules=[wide, QueryRule(Wide, (int,))]) + for _ in range(0, 5): + rule_runner.scheduler.scheduler.invalidate_all() + _ = rule_runner.request(Wide, [1000]) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index c53a7eeaaac..fd73a78c86d 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1068,6 +1068,7 @@ dependencies = [ "parking_lot", "petgraph", "rand 0.8.2", + "task_executor", "tokio", ] @@ -3374,6 +3375,7 @@ dependencies = [ "mio 0.7.13", "num_cpus", "once_cell", + "parking_lot", "pin-project-lite", "signal-hook-registry", "tokio-macros", diff --git a/src/rust/engine/async_value/src/lib.rs b/src/rust/engine/async_value/src/lib.rs index e499f37dcbf..c9b34132ca3 100644 --- a/src/rust/engine/async_value/src/lib.rs +++ b/src/rust/engine/async_value/src/lib.rs @@ -42,9 +42,7 @@ use tokio::sync::{oneshot, watch}; #[derive(Debug)] pub struct AsyncValue { item_receiver: Weak>>, - // NB: Stored only for drop. - #[allow(dead_code)] - abort_sender: oneshot::Sender<()>, + abort_sender: Option>, } impl AsyncValue { @@ -55,7 +53,7 @@ impl AsyncValue { ( AsyncValue { item_receiver: Arc::downgrade(&item_receiver), - abort_sender, + abort_sender: Some(abort_sender), }, AsyncValueSender { item_sender, @@ -75,6 +73,14 @@ impl AsyncValue { .upgrade() .map(|item_receiver| AsyncValueReceiver { item_receiver }) } + + pub fn try_abort(&mut self, t: T) -> Result<(), T> { + if let Some(abort_sender) = self.abort_sender.take() { + abort_sender.send(t) + } else { + Ok(()) + } + } } pub struct AsyncValueReceiver { @@ -102,7 +108,7 @@ impl AsyncValueReceiver { pub struct AsyncValueSender { item_sender: watch::Sender>, - abort_receiver: oneshot::Receiver<()>, + abort_receiver: oneshot::Receiver, } impl AsyncValueSender { @@ -110,10 +116,21 @@ impl AsyncValueSender { let _ = self.item_sender.send(Some(item)); } - pub async fn closed(&mut self) { + pub async fn aborted(&mut self) -> Option { tokio::select! { - _ = &mut self.abort_receiver => {} - _ = self.item_sender.closed() => {} + res = &mut self.abort_receiver => { + match res { + Ok(res) => { + // Aborted with a value. + Some(res) + }, + Err(_) => { + // Was dropped. + None + }, + } + } + _ = self.item_sender.closed() => { None } } } } diff --git a/src/rust/engine/async_value/src/tests.rs b/src/rust/engine/async_value/src/tests.rs index 670e2816eec..6d0c0c658cf 100644 --- a/src/rust/engine/async_value/src/tests.rs +++ b/src/rust/engine/async_value/src/tests.rs @@ -17,7 +17,7 @@ async fn cancel_explicit() { let (value, mut sender, receiver) = AsyncValue::<()>::new(); // A task that will never do any meaningful work, and just wait to be canceled. - let _send_task = tokio::spawn(async move { sender.closed().await }); + let _send_task = tokio::spawn(async move { sender.aborted().await }); // Ensure that a value is not received. tokio::select! { @@ -35,7 +35,7 @@ async fn cancel_implicit() { let (value, mut sender, receiver) = AsyncValue::<()>::new(); // A task that will never do any meaningful work, and just wait to be canceled. - let send_task = tokio::spawn(async move { sender.closed().await }); + let send_task = tokio::spawn(async move { sender.aborted().await }); // Ensure that a value is not received. tokio::select! { @@ -46,6 +46,25 @@ async fn cancel_implicit() { // Then drop the only receiver and confirm that the background task returns, and that new // receivers cannot be created. std::mem::drop(receiver); - send_task.await.unwrap(); + assert_eq!(None, send_task.await.unwrap()); assert!(value.receiver().is_none()); } + +#[tokio::test] +async fn abort_explicit() { + let (mut value, mut sender, receiver) = AsyncValue::<()>::new(); + + // A task that will never do any meaningful work, and just wait to be canceled. + let send_task = tokio::spawn(async move { sender.aborted().await }); + + // Ensure that a value is not received. + tokio::select! { + _ = sleep(Duration::from_secs(1)) => {}, + _ = receiver.recv() => { panic!("Should have continued to wait.") } + } + + // Explicitly abort the task, and confirm that it exits and cancels the work. + value.try_abort(()).unwrap(); + assert_eq!(Some(()), send_task.await.unwrap()); + assert_eq!(None, receiver.recv().await); +} diff --git a/src/rust/engine/graph/Cargo.toml b/src/rust/engine/graph/Cargo.toml index b7f29983a59..f17da641326 100644 --- a/src/rust/engine/graph/Cargo.toml +++ b/src/rust/engine/graph/Cargo.toml @@ -14,7 +14,8 @@ fixedbitset = "0.2" log = "0.4" parking_lot = "0.11" petgraph = "0.5" -tokio = { version = "1.4", features = ["time"] } +task_executor = { path = "../task_executor" } +tokio = { version = "1.4", features = ["time", "parking_lot"] } [dev-dependencies] rand = "0.8" diff --git a/src/rust/engine/graph/src/entry.rs b/src/rust/engine/graph/src/entry.rs index 9fccefd0dba..36a37cb62e4 100644 --- a/src/rust/engine/graph/src/entry.rs +++ b/src/rust/engine/graph/src/entry.rs @@ -180,6 +180,7 @@ pub enum EntryState { pending_value: AsyncValue>, generation: Generation, previous_result: Option>, + is_cleaning: bool, }, // A node that has completed, and then possibly been marked dirty. Because marking a node // dirty does not eagerly re-execute any logic, it will stay this way until a caller moves it @@ -309,7 +310,8 @@ impl Entry { let context = context_factory.clone_for(entry_id); let context2 = context.clone(); let node = node.clone(); - let (value, mut sender, receiver) = AsyncValue::new(); + let (value, mut sender, receiver) = AsyncValue::>::new(); + let is_cleaning = previous_dep_generations.is_some(); let run_or_clean = async move { // If we have previous result generations, compare them to all current dependency @@ -323,7 +325,7 @@ impl Entry { { // If dependency generations mismatched or failed to fetch, clear the node's dependencies // and indicate that it should re-run. - context.graph().clear_deps(entry_id, run_token); + context.graph().cleaning_failed(entry_id, run_token); context.stats().cleaning_failed += 1; false } else { @@ -349,20 +351,27 @@ impl Entry { }; context_factory.spawn(async move { - tokio::select! { - _ = sender.closed() => { - // We've been explicitly canceled: exit. - context2 - .graph() - .cancel(entry_id, run_token); + let maybe_res = tokio::select! { + abort_item = sender.aborted() => { + if let Some(res) = abort_item { + // We were aborted via terminate: complete with the given res. + Some(res.map(|v| v.0)) + } else { + // We were aborted via drop: exit. + context2 + .graph() + .cancel(entry_id, run_token); + return; + } } maybe_res = run_or_clean => { - // The node completed. - context2 - .graph() - .complete(&context2, entry_id, run_token, sender, maybe_res); + maybe_res } - } + }; + // The node completed. + context2 + .graph() + .complete(&context2, entry_id, run_token, sender, maybe_res); }); ( @@ -371,6 +380,7 @@ impl Entry { pending_value: value, generation, previous_result, + is_cleaning, }, receiver, ) @@ -754,6 +764,30 @@ impl Entry { } } + /// + /// Terminates this Node with the given error iff it is Running. + /// + /// This method is asynchronous: the task running the Node will take some time to notice that it + /// has been terminated, and to update the state of the Node. + /// + pub(crate) fn terminate(&mut self, err: N::Error) { + let state = &mut *self.state.lock(); + test_trace_log!("Terminating node {:?} with {:?}", self.node, err); + if let EntryState::Running { pending_value, .. } = state { + let _ = pending_value.try_abort(Err(err)); + }; + } + + /// + /// Indicates that cleaning this Node has failed. + /// + pub(crate) fn cleaning_failed(&mut self) { + let state = &mut *self.state.lock(); + if let EntryState::Running { is_cleaning, .. } = state { + *is_cleaning = false; + }; + } + pub fn is_started(&self) -> bool { match *self.state.lock() { EntryState::NotStarted { .. } => false, @@ -768,6 +802,13 @@ impl Entry { } } + pub fn is_cleaning(&self) -> bool { + match *self.state.lock() { + EntryState::Running { is_cleaning, .. } => is_cleaning, + EntryState::Completed { .. } | EntryState::NotStarted { .. } => false, + } + } + pub fn is_clean(&self, context: &N::Context) -> bool { match *self.state.lock() { EntryState::NotStarted { diff --git a/src/rust/engine/graph/src/lib.rs b/src/rust/engine/graph/src/lib.rs index f2b7edc998e..ce31a6bd368 100644 --- a/src/rust/engine/graph/src/lib.rs +++ b/src/rust/engine/graph/src/lib.rs @@ -38,24 +38,26 @@ use std::fs::File; use std::hash::BuildHasherDefault; use std::io::{self, BufWriter, Write}; use std::path::Path; +use std::sync::{Arc, Weak}; use std::time::Duration; use async_value::AsyncValueSender; use fixedbitset::FixedBitSet; use fnv::FnvHasher; use futures::future; -use log::{debug, info, warn}; +use log::info; use parking_lot::Mutex; use petgraph::graph::DiGraph; use petgraph::visit::{EdgeRef, VisitMap, Visitable}; use petgraph::Direction; +use task_executor::Executor; use tokio::time::sleep; pub use crate::node::{EntryId, Node, NodeContext, NodeError, NodeVisualizer, Stats}; type Fnv = BuildHasherDefault; -type PGraph = DiGraph, f32, u32>; +type PGraph = DiGraph, (), u32>; #[derive(Debug, Eq, PartialEq)] pub struct InvalidationResult { @@ -108,168 +110,90 @@ impl InnerGraph { } /// - /// Detect whether adding an edge from src to dst would create a cycle. + /// Locates all* cycles in running nodes in the graph, and terminates one Node in each of them. /// - /// Returns a path which would cause the cycle if an edge were added from src to dst, or None if - /// no cycle would be created. + /// * Finding "all simple cycles" in a graph is apparently best accomplished with [Johnson's + /// algorithm](https://www.cs.tufts.edu/comp/150GA/homeworks/hw1/Johnson%2075.PDF), which uses + /// the strongly connected components, but goes a bit further. Because this method will run + /// multiple times, we don't worry about that, and just kill one member of each SCC. /// - /// This strongly optimizes for the case of no cycles. If cycles are detected, this is very - /// expensive to call. - /// - fn report_cycle(&self, src_id: EntryId, dst_id: EntryId) -> Option>> { - if src_id == dst_id { - let entry = self.entry_for_id(src_id).unwrap(); - return Some(vec![entry.clone(), entry.clone()]); - } - if !self.detect_cycle(src_id, dst_id) { - return None; - } - Self::shortest_path(&self.pg, dst_id, src_id).map(|mut path| { - path.reverse(); - path.push(dst_id); - path - .into_iter() - .map(|index| self.entry_for_id(index).unwrap().clone()) - .collect() - }) - } - - /// - /// Detect whether adding an edge from src to dst would create a cycle. - /// - /// Uses Dijkstra's algorithm, which is significantly cheaper than the Bellman-Ford, but keeps - /// less context around paths on the way. - /// - fn detect_cycle(&self, src_id: EntryId, dst_id: EntryId) -> bool { - // Search either forward from the dst, or backward from the src. - let (root, needle, direction) = { - let out_from_dst = self.pg.neighbors(dst_id).count(); - let in_to_src = self - .pg - .neighbors_directed(src_id, Direction::Incoming) - .count(); - if out_from_dst < in_to_src { - (dst_id, src_id, Direction::Outgoing) - } else { - (src_id, dst_id, Direction::Incoming) - } - }; - - // Search for an existing path from dst to src. - let mut roots = VecDeque::new(); - roots.push_back(root); - self - .walk(roots, direction, |_| false) - .any(|eid| eid == needle) - } - - /// - /// Compute and return one shortest path from `src` to `dst`. - /// - /// Uses Bellman-Ford, which is pretty expensive O(VE) as it has to traverse the whole graph and - /// keeping a lot of state on the way. - /// - fn shortest_path(graph: &PGraph, src: EntryId, dst: EntryId) -> Option> { - let (_path_weights, paths) = petgraph::algo::bellman_ford(graph, src) - .expect("There should not be any negative edge weights"); + fn terminate_cycles(&mut self) { + // Build a graph of Running node indexes. + let running_graph = self.pg.filter_map( + |node_idx, node_weight| { + if node_weight.is_running() { + Some(node_idx) + } else { + None + } + }, + |_edge_idx, _edge_weight| Some(()), + ); + // TODO: We'd usually use `tarjan_scc` because it makes one fewer pass, but it panics (without + // a useful error message) for some graphs. So `kosaraju_scc` it is. + let running_sccs = petgraph::algo::kosaraju_scc(&running_graph); - let mut next = dst; - let mut path = vec![next]; - while let Some(current) = paths[next.index()] { - path.push(current); - if current == src { - return Some(path); + for running_scc in running_sccs { + if running_scc.len() <= 1 { + continue; } - next = current; - } - None - } - /// - /// Compute the critical path for this graph. - /// - /// The critical path is the longest path. For a directed acyclic graph, it is equivalent to a - /// shortest path algorithm. - /// - /// Modify the graph we have to fit into the expectations of the Bellman-Ford shortest graph - /// algorithm and use that to calculate the critical path. - /// - fn critical_path(&self, roots: &[N], duration: &F) -> (Duration, Vec>) - where - F: Fn(&Entry) -> Duration, - { - fn duration_into_weight(d: Duration) -> f64 { - -(d.as_nanos() as f64) - } - - // First, let's map nodes to edges - let mut graph = self.pg.filter_map( - |_node_idx, node_weight| Some(Some(node_weight)), - |edge_idx, _edge_weight| { - let target_node = self.pg.raw_edges()[edge_idx.index()].target(); - self - .pg - .node_weight(target_node) - .map(duration) - .map(duration_into_weight) - }, - ); + // There is a cycle. We bias toward terminating nodes which are being cleaned, because it's + // possible for them to form false cycles with nodes which are running from scratch. If no + // nodes are being cleaned, then choose the running node with the highest node id. + let (running_candidate, should_terminate) = if let Some(dirty_candidate) = running_scc + .iter() + .filter(|&id| self.pg[running_graph[*id]].is_cleaning()) + .max() + { + // Nodes are being cleaned: clear the highest id entry. + (dirty_candidate, false) + } else { + // There are no nodes being cleaned: terminate the Running node with the highest id. + (running_scc.iter().max().unwrap(), true) + }; - // Add a single source that's a parent to all roots - let srcs = roots - .iter() - .filter_map(|n| self.entry_id(n)) - .cloned() - .collect::>(); - let src = graph.add_node(None); - for node in srcs { - graph.add_edge( - src, - node, - graph - .node_weight(node) - .map(|maybe_weight| { - maybe_weight - .map(duration) - .map(duration_into_weight) - .unwrap_or(0.) + test_trace_log!( + "Cycle {:?}", + running_scc + .iter() + .map(|id| { + let entry = &self.pg[running_graph[*id]]; + format!("{:?}: is_cleaning: {}", entry.node(), entry.is_cleaning()) }) - .unwrap(), + .collect::>(), ); - } - let (weights, paths) = - petgraph::algo::bellman_ford(&graph, src).expect("The graph must be acyclic"); - if let Some((index, total_duration)) = weights - .into_iter() - .enumerate() - .filter_map(|(i, weight)| { - // INFINITY is used for missing entries. - if weight == std::f64::INFINITY { - None - } else { - Some((i, Duration::from_nanos(-weight as u64))) - } - }) - .max_by(|(_, left_duration), (_, right_duration)| left_duration.cmp(right_duration)) - { - let critical_path = { - let mut next = paths[index]; - let mut path = vec![graph - .node_weight(petgraph::graph::NodeIndex::new(index)) - .unwrap() - .unwrap()]; - while next != Some(src) && next != None { - if let Some(entry) = graph.node_weight(next.unwrap()).unwrap() { - path.push(*entry); - } - next = paths[next.unwrap().index()]; - } - path.into_iter().rev().cloned().collect() - }; - (total_duration, critical_path) - } else { - (Duration::from_nanos(0), vec![]) + // Calculate one path between the chosen node and itself by finding a path to its first + // predecessor (which as a fellow member of the SCC, must also be reachable). + let running_predecessor = running_graph + .neighbors_directed(*running_candidate, Direction::Incoming) + .next() + .unwrap(); + let running_path: Vec<_> = petgraph::algo::all_simple_paths( + &running_graph, + *running_candidate, + running_predecessor, + 0, + None, + ) + .next() + .unwrap(); + + // Either terminate or clear the candidate. + let candidate = running_graph[*running_candidate]; + if should_terminate { + // Render the error, and terminate the Node with it. + let path_strs = running_path + .into_iter() + .map(|rni| self.pg[rni].node().to_string()) + .collect(); + self.pg[candidate].terminate(N::Error::cyclic(path_strs)); + } else { + // Else, clear. + let node = self.pg[candidate].node().clone(); + self.invalidate_from_roots(|n| &node == n); + } } } @@ -461,26 +385,48 @@ impl InnerGraph { /// A DAG (enforced on mutation) of Entries. /// pub struct Graph { - inner: Mutex>, + inner: Arc>>, invalidation_delay: Duration, } impl Graph { - pub fn new() -> Graph { - Self::new_with_invalidation_delay(Duration::from_millis(500)) + pub fn new(executor: Executor) -> Graph { + Self::new_with_invalidation_delay(executor, Duration::from_millis(500)) } - pub fn new_with_invalidation_delay(invalidation_delay: Duration) -> Graph { - let inner = InnerGraph { + pub fn new_with_invalidation_delay(executor: Executor, invalidation_delay: Duration) -> Graph { + let inner = Arc::new(Mutex::new(InnerGraph { nodes: HashMap::default(), pg: DiGraph::new(), - }; + })); + let _join = executor.spawn(Self::cycle_check_task(Arc::downgrade(&inner))); + Graph { - inner: Mutex::new(inner), + inner, invalidation_delay, } } + /// + /// A task which periodically checks for cycles in Running nodes. Doing this in the background + /// allows for batching and laziness: nodes which don't form cycles may complete without ever + /// being checked. + /// + /// Uses a `Weak` reference to the Graph to detect when the sender has shut down. + /// + async fn cycle_check_task(inner: Weak>>) { + loop { + sleep(Duration::from_millis(500)).await; + + if let Some(inner) = Weak::upgrade(&inner) { + inner.lock().terminate_cycles(); + } else { + // We've been shut down. + break; + }; + } + } + pub fn len(&self) -> usize { let inner = self.inner.lock(); inner.nodes.len() @@ -497,28 +443,14 @@ impl Graph { // Get or create the destination, and then insert the dep and return its state. let mut inner = self.inner.lock(); - // TODO: doing cycle detection under the lock... unfortunate, but probably unavoidable - // without a much more complicated algorithm. let dst_id = inner.ensure_entry(dst_node); let dst_retry = if let Some(src_id) = src_id { - if let Some(cycle_path) = Self::report_cycle(src_id, dst_id, &mut inner, context) { - // Cyclic dependency: render an error. - let path_strs = cycle_path - .into_iter() - .map(|e| e.node().to_string()) - .collect(); - return Err(N::Error::cyclic(path_strs)); - } - - // Valid dependency. test_trace_log!( "Adding dependency from {:?} to {:?}", inner.entry_for_id(src_id).unwrap().node(), inner.entry_for_id(dst_id).unwrap().node() ); - // All edges get a weight of 1.0 so that we can Bellman-Ford over the graph, treating each - // edge as having equal weight. - inner.pg.add_edge(src_id, dst_id, 1.0); + inner.pg.add_edge(src_id, dst_id, ()); // We should retry the dst Node if the src Node is not restartable. If the src is not // restartable, it is only allowed to run once, and so Node invalidation does not pass @@ -621,72 +553,6 @@ impl Graph { Ok((res, LastObserved(generation))) } - fn report_cycle( - src_id: EntryId, - potential_dst_id: EntryId, - inner: &mut InnerGraph, - context: &N::Context, - ) -> Option>> { - let mut counter = 0; - loop { - // Find one cycle if any cycles exist. - if let Some(cycle_path) = inner.report_cycle(src_id, potential_dst_id) { - // See if the cycle contains any dirty nodes. If there are dirty nodes, we can try clearing - // them, and then check if there are still any cycles in the graph. - let dirty_nodes: HashSet<_> = cycle_path - .iter() - .filter(|n| !n.is_clean(context)) - .map(|n| n.node().clone()) - .collect(); - if dirty_nodes.is_empty() { - // We detected a cycle with no dirty nodes - there's a cycle and there's nothing we can do - // to remove it. We only log at debug because the UI will render the cycle. - debug!( - "Detected cycle considering adding edge from {:?} to {:?}; existing path: {:?}", - inner.entry_for_id(src_id).unwrap(), - inner.entry_for_id(potential_dst_id).unwrap(), - cycle_path - ); - return Some(cycle_path); - } - counter += 1; - // Obsolete edges from a dirty node may cause fake cycles to be detected if there was a - // dirty dep from A to B, and we're trying to add a dep from B to A. - // If we detect a cycle that contains dirty nodes (and so potentially obsolete edges), - // we repeatedly cycle-detect, clearing (and re-running) and dirty nodes (and their edges) - // that we encounter. - // - // We do this repeatedly, because there may be multiple paths which would cause cycles, - // which contain dirty nodes. If we've cleared 10 separate paths which contain dirty nodes, - // and are still detecting cycle-causing paths containing dirty nodes, give up. 10 is a very - // arbitrary number, which we can increase if we find real graphs in the wild which hit this - // limit. - if counter > 10 { - warn!( - "Couldn't remove cycle containing dirty nodes after {} attempts; nodes in cycle: {:?}", - counter, cycle_path - ); - return Some(cycle_path); - } - // Clear the dirty nodes, removing the edges from them, and try again. - inner.invalidate_from_roots(|node| dirty_nodes.contains(node)); - } else { - return None; - } - } - } - - /// - /// Calculate the critical path for the subset of the graph that descends from these roots, - /// assuming this mapping between entries and durations. - /// - pub fn critical_path(&self, roots: &[N], duration: &F) -> (Duration, Vec>) - where - F: Fn(&Entry) -> Duration, - { - self.inner.lock().critical_path(roots, duration) - } - /// /// Compares the generations of the dependencies of the given EntryId to their previous /// generation values (re-computing or cleaning them first if necessary), and returns true if any @@ -746,13 +612,14 @@ impl Graph { /// /// Clears the dependency edges of the given EntryId if the RunToken matches. /// - fn clear_deps(&self, entry_id: EntryId, run_token: RunToken) { + fn cleaning_failed(&self, entry_id: EntryId, run_token: RunToken) { let mut inner = self.inner.lock(); // If the RunToken mismatches, return. - if let Some(entry) = inner.entry_for_id(entry_id) { + if let Some(entry) = inner.entry_for_id_mut(entry_id) { if entry.run_token() != run_token { return; } + entry.cleaning_failed() } // Otherwise, clear the deps. diff --git a/src/rust/engine/graph/src/tests.rs b/src/rust/engine/graph/src/tests.rs index d497187a9da..ca7b4930816 100644 --- a/src/rust/engine/graph/src/tests.rs +++ b/src/rust/engine/graph/src/tests.rs @@ -11,13 +11,18 @@ use async_trait::async_trait; use futures::future; use parking_lot::Mutex; use rand::{self, Rng}; +use task_executor::Executor; use tokio::time::{error::Elapsed, sleep, timeout}; use crate::{EntryId, Graph, InvalidationResult, Node, NodeContext, NodeError, Stats}; +fn empty_graph() -> Arc> { + Arc::new(Graph::new(Executor::new())) +} + #[tokio::test] async fn create() { - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = TContext::new(graph.clone()); assert_eq!( graph.create(TNode::new(2), &context).await, @@ -27,7 +32,7 @@ async fn create() { #[tokio::test] async fn invalidate_and_clean() { - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = TContext::new(graph.clone()); // Create three nodes. @@ -62,7 +67,7 @@ async fn invalidate_and_clean() { #[tokio::test] async fn invalidate_and_rerun() { - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = TContext::new(graph.clone()); // Create three nodes. @@ -96,7 +101,7 @@ async fn invalidate_and_rerun() { #[tokio::test] async fn invalidate_with_changed_dependencies() { - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = TContext::new(graph.clone()); // Create three nodes. @@ -136,7 +141,7 @@ async fn invalidate_with_changed_dependencies() { #[ignore] // flaky: https://github.com/pantsbuild/pants/issues/10839 #[tokio::test] async fn invalidate_randomly() { - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let invalidations = 10; let sleep_per_invalidation = Duration::from_millis(100); @@ -202,7 +207,7 @@ async fn invalidate_randomly() { #[tokio::test] async fn poll_cacheable() { - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = TContext::new(graph.clone()); // Poll with an empty graph should succeed. @@ -240,7 +245,7 @@ async fn poll_cacheable() { #[tokio::test] async fn poll_uncacheable() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); // Create a context where the middle node is uncacheable. let context = { let mut uncacheable = HashSet::new(); @@ -273,7 +278,7 @@ async fn poll_uncacheable() { #[tokio::test] async fn uncacheable_dependents_of_uncacheable_node() { - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); // Create a context for which the bottommost Node is not cacheable. let context = { @@ -316,7 +321,7 @@ async fn uncacheable_dependents_of_uncacheable_node() { #[tokio::test] async fn non_restartable_node_only_runs_once() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = { let mut non_restartable = HashSet::new(); @@ -353,7 +358,7 @@ async fn non_restartable_node_only_runs_once() { #[tokio::test] async fn uncacheable_deps_is_cleaned_for_the_session() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = { let mut uncacheable = HashSet::new(); @@ -388,7 +393,7 @@ async fn uncacheable_deps_is_cleaned_for_the_session() { #[tokio::test] async fn dirtied_uncacheable_deps_node_re_runs() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = { let mut uncacheable = HashSet::new(); @@ -448,7 +453,7 @@ async fn dirtied_uncacheable_deps_node_re_runs() { #[tokio::test] async fn retries() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let context = { let sleep_root = Duration::from_millis(100); @@ -481,7 +486,10 @@ async fn retries() { async fn canceled_on_invalidation() { let _logger = env_logger::try_init(); let invalidation_delay = Duration::from_millis(10); - let graph = Arc::new(Graph::new_with_invalidation_delay(invalidation_delay)); + let graph = Arc::new(Graph::new_with_invalidation_delay( + Executor::new(), + invalidation_delay, + )); let sleep_middle = Duration::from_millis(2000); let start_time = Instant::now(); @@ -529,7 +537,7 @@ async fn canceled_on_invalidation() { #[tokio::test] async fn canceled_on_loss_of_interest() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let sleep_middle = Duration::from_millis(2000); let start_time = Instant::now(); @@ -563,7 +571,7 @@ async fn canceled_on_loss_of_interest() { #[tokio::test] async fn clean_speculatively() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); // Create a graph with a node with two dependencies, one of which takes much longer // to run. @@ -607,7 +615,7 @@ async fn clean_speculatively() { #[tokio::test] async fn cyclic_failure() { // Confirms that an attempt to create a cycle fails. - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let top = TNode::new(2); let context = TContext::new(graph.clone()).with_dependencies( // Request creation of a cycle by sending the bottom most node to the top. @@ -622,9 +630,10 @@ async fn cyclic_failure() { #[tokio::test] async fn cyclic_dirtying() { + let _logger = env_logger::try_init(); // Confirms that a dirtied path between two nodes is able to reverse direction while being // cleaned. - let graph = Arc::new(Graph::new()); + let graph = empty_graph(); let initial_top = TNode::new(2); let initial_bot = TNode::new(0); @@ -656,105 +665,6 @@ async fn cyclic_dirtying() { assert_eq!(res, Ok(vec![T(1, 1), T(2, 1)])); } -#[tokio::test] -async fn critical_path() { - use super::entry::Entry; - // First, let's describe the scenario with plain data. - // - // We label the nodes with static strings to help visualise the situation. - // The first element of each tuple is a readable label. The second element represents the - // duration for this action. - let nodes = [ - ("download jvm", 10), - ("download a", 1), - ("download b", 2), - ("download c", 3), - ("compile a", 3), - ("compile b", 20), - ("compile c", 5), - ]; - let deps = [ - ("download jvm", "compile a"), - ("download jvm", "compile b"), - ("download jvm", "compile c"), - ("download a", "compile a"), - ("download b", "compile b"), - ("download c", "compile c"), - ("compile a", "compile c"), - ("compile b", "compile c"), - ]; - - // Describe a few transformations to navigate between our readable data and the actual types - // needed for the graph. - let tnode = |node: &str| { - TNode::new( - nodes - .iter() - .map(|(k, _)| k) - .position(|label| &node == label) - .unwrap(), - ) - }; - let node_key = |node: &str| tnode(node); - let node_entry = |node: &str| Entry::new(node_key(node)); - let node_and_duration_from_entry = |entry: &super::entry::Entry| nodes[entry.node().id]; - let node_duration = - |entry: &super::entry::Entry| Duration::from_secs(node_and_duration_from_entry(entry).1); - - // Construct a graph and populate it with the nodes and edges prettily defined above. - let graph = Graph::new(); - { - let inner = &mut graph.inner.lock(); - for (node, _) in &nodes { - let node_index = inner.pg.add_node(node_entry(node)); - inner.nodes.insert(node_key(node), node_index); - } - for (src, dst) in &deps { - let src = inner.nodes[&node_key(src)]; - let dst = inner.nodes[&node_key(dst)]; - inner.pg.add_edge(src, dst, 1.0); - } - } - - // Calculate the critical path and validate it. - { - // The roots are all the sources, so we're covering the entire graph - let roots = ["download jvm", "download a", "download b", "download c"] - .iter() - .map(|n| tnode(n)) - .collect::>(); - let (expected_total_duration, expected_critical_path) = ( - Duration::from_secs(35), - vec!["download jvm", "compile b", "compile c"], - ); - let (total_duration, critical_path) = graph.critical_path(&roots, &node_duration); - assert_eq!(expected_total_duration, total_duration); - let critical_path = critical_path - .iter() - .map(|entry| node_and_duration_from_entry(entry).0) - .collect::>(); - assert_eq!(expected_critical_path, critical_path); - } - { - // The roots exclude some nodes ("download jvm", "download a") from the graph. - let roots = ["download b", "download c"] - .iter() - .map(|n| tnode(n)) - .collect::>(); - let (expected_total_duration, expected_critical_path) = ( - Duration::from_secs(27), - vec!["download b", "compile b", "compile c"], - ); - let (total_duration, critical_path) = graph.critical_path(&roots, &node_duration); - assert_eq!(expected_total_duration, total_duration); - let critical_path = critical_path - .iter() - .map(|entry| node_and_duration_from_entry(entry).0) - .collect::>(); - assert_eq!(expected_critical_path, critical_path); - } -} - /// /// A token containing the id of a Node and the id of a Context, respectively. Has a short name /// to minimize the verbosity of tests. diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index f2314e9f129..36807557d83 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -420,7 +420,7 @@ impl Core { capabilities_cell_opt, )?; - let graph = Arc::new(InvalidatableGraph(Graph::new())); + let graph = Arc::new(InvalidatableGraph(Graph::new(executor.clone()))); // These certs are for downloads, not to be confused with the ones used for remoting. let ca_certs = Self::load_certificates(ca_certs_path)?;