From c524da8862bc1755297abc95b2414f9a76c608ea Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Sat, 30 Jun 2018 12:32:02 -0700 Subject: [PATCH] Implement cleaning of Nodes by comparing the stored generation values to newly computed generations. --- src/rust/engine/graph/src/lib.rs | 137 +++++++++++++++++++++++++++---- 1 file changed, 119 insertions(+), 18 deletions(-) diff --git a/src/rust/engine/graph/src/lib.rs b/src/rust/engine/graph/src/lib.rs index d3e94509da69..d4e714b8db70 100644 --- a/src/rust/engine/graph/src/lib.rs +++ b/src/rust/engine/graph/src/lib.rs @@ -166,16 +166,16 @@ impl Entry { } /// - /// Spawn the execution of the work on an Executor, which will cause it to execute - /// outside of the Graph lock, and allow us to call back into the graph lock to set the - /// final value. + /// Spawn the execution of the node on an Executor, which will cause it to execute outside of + /// the Graph lock and call back into the graph lock to set the final value. /// - fn start( + fn run( context: &C, entry_key: &EntryKey, entry_id: EntryId, run_token: RunToken, generation: Generation, + previous_dep_generations: Option>, previous_result: Option>, ) -> EntryState where @@ -189,12 +189,50 @@ impl Entry { let node = n.clone(); context.spawn(future::lazy(move || { - let context = context2.clone(); - node.run(context2).then(move |res| { - context + // If we have previous result generations, compare them to all current dependency + // generations (which, if they are dirty, will cause recursive cleaning). If they + // match, we can consider the previous result value to be clean for reuse. + let was_clean = if let Some(previous_dep_generations) = previous_dep_generations { + context2 .graph() - .complete(&context, entry_id, run_token, Some(res)); - Ok(()) + .dep_generations(entry_id, &context2) + .then(move |generation_res| match generation_res { + Ok(ref dep_generations) if dep_generations == &previous_dep_generations => { + // Dependencies have not changed: Node is clean. + Ok(true) + } + _ => { + // If dependency generations mismatched or failed to fetch, re-run the Node. + Ok(false) + } + }) + .to_boxed() + } else { + future::ok(false).to_boxed() + }; + + // If the Node was clean, complete it. Otherwise, re-run. + was_clean.and_then(move |was_clean| { + if was_clean { + // No dependencies have changed: we can complete the Node without changing its + // previous_result or generation. + context2 + .graph() + .complete(&context2, entry_id, run_token, None); + future::ok(()).to_boxed() + } else { + // The Node needs to (re-)run! + let context = context2.clone(); + node + .run(context2) + .then(move |res| { + context + .graph() + .complete(&context, entry_id, run_token, Some(res)); + Ok(()) + }) + .to_boxed() + } }) })); @@ -262,12 +300,13 @@ impl Entry { run_token, generation, previous_result, - } => Self::start( + } => Self::run( context, &self.node, entry_id, run_token, generation, + None, previous_result, ), EntryState::Completed { @@ -283,16 +322,15 @@ impl Entry { result ); // The Node has already completed but is now marked dirty. This indicates that we are the - // first caller to request it since it was marked dirty. We must transition it back to - // running. - // - // TODO: clean! - Self::start( + // first caller to request it since it was marked dirty. We attempt to clean it (which will + // cause it to re-run if the dep_generations mismatch). + Self::run( context, &self.node, entry_id, run_token, generation, + Some(dep_generations), Some(result), ) } @@ -361,12 +399,13 @@ impl Entry { if dirty { // The node was dirtied while it was running. The dep_generations and new result cannot // be trusted and were never published. We continue to use the previous result. - Self::start( + Self::run( context, &self.node, entry_id, run_token, generation, + None, previous_result, ) } else { @@ -901,6 +940,40 @@ impl Graph { } } + /// + /// Gets the generations of the dependencies of the given EntryId, (re)computing or cleaning + /// them first if necessary. + /// + fn dep_generations( + &self, + entry_id: EntryId, + context: &C, + ) -> BoxFuture, N::Error> + where + C: NodeContext, + { + let mut inner = self.inner.lock().unwrap(); + let dep_ids = inner + .pg + .neighbors_directed(entry_id, Direction::Outgoing) + .collect::>(); + + future::join_all( + dep_ids + .into_iter() + .map(|dep_id| { + let entry = inner + .entry_for_id_mut(dep_id) + .unwrap_or_else(|| panic!("Dependency not present in Graph.")); + entry + .get(context, dep_id) + .map(|(_, generation)| generation) + .to_boxed() + }) + .collect::>(), + ).to_boxed() + } + /// /// When the Executor finishes executing a Node it calls back to store the result value. We use /// the run_token and dirty bits to determine whether the Node changed while we were busy @@ -1101,10 +1174,38 @@ mod tests { graph.create(TNode(2), &context).wait(), Ok(vec![T(0, 0), T(1, 0), T(2, 0)]) ); + assert_eq!(context.runs(), vec![TNode(2), TNode(1), TNode(0), TNode(1)]); + } + + #[test] + fn invalidate_and_rerun() { + let graph = Arc::new(Graph::new()); + let context0 = TContext::new(0, graph.clone()); + + // Create three nodes. + assert_eq!( + graph.create(TNode(2), &context0).wait(), + Ok(vec![T(0, 0), T(1, 0), T(2, 0)]) + ); + assert_eq!(context0.runs(), vec![TNode(2), TNode(1), TNode(0)]); + + // Clear the middle Node, which dirties the upper node. + assert_eq!( + graph.invalidate_from_roots(|&TNode(n)| n == 1), + InvalidationResult { + cleared: 1, + dirtied: 1 + } + ); + + // Request with a new context, which will cause both the middle and upper nodes to rerun since + // their input values have changed. + let context1 = TContext::new(1, graph.clone()); assert_eq!( - context.runs(), - vec![TNode(2), TNode(1), TNode(0), TNode(2), TNode(1)] + graph.create(TNode(2), &context1).wait(), + Ok(vec![T(0, 0), T(1, 1), T(2, 1)]) ); + assert_eq!(context1.runs(), vec![TNode(1), TNode(2)]); } ///