Skip to content

Commit

Permalink
Implement cleaning of Nodes by comparing the stored generation values…
Browse files Browse the repository at this point in the history
… to newly computed generations.
  • Loading branch information
stuhood committed Jul 3, 2018
1 parent fd08f85 commit c524da8
Showing 1 changed file with 119 additions and 18 deletions.
137 changes: 119 additions & 18 deletions src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ impl<N: Node> Entry<N> {
}

///
/// Spawn the execution of the work on an Executor, which will cause it to execute
/// outside of the Graph lock, and allow us to call back into the graph lock to set the
/// final value.
/// Spawn the execution of the node on an Executor, which will cause it to execute outside of
/// the Graph lock and call back into the graph lock to set the final value.
///
fn start<C>(
fn run<C>(
context: &C,
entry_key: &EntryKey<N>,
entry_id: EntryId,
run_token: RunToken,
generation: Generation,
previous_dep_generations: Option<Vec<Generation>>,
previous_result: Option<Result<N::Item, N::Error>>,
) -> EntryState<N>
where
Expand All @@ -189,12 +189,50 @@ impl<N: Node> Entry<N> {
let node = n.clone();

context.spawn(future::lazy(move || {
let context = context2.clone();
node.run(context2).then(move |res| {
context
// If we have previous result generations, compare them to all current dependency
// generations (which, if they are dirty, will cause recursive cleaning). If they
// match, we can consider the previous result value to be clean for reuse.
let was_clean = if let Some(previous_dep_generations) = previous_dep_generations {
context2
.graph()
.complete(&context, entry_id, run_token, Some(res));
Ok(())
.dep_generations(entry_id, &context2)
.then(move |generation_res| match generation_res {
Ok(ref dep_generations) if dep_generations == &previous_dep_generations => {
// Dependencies have not changed: Node is clean.
Ok(true)
}
_ => {
// If dependency generations mismatched or failed to fetch, re-run the Node.
Ok(false)
}
})
.to_boxed()
} else {
future::ok(false).to_boxed()
};

// If the Node was clean, complete it. Otherwise, re-run.
was_clean.and_then(move |was_clean| {
if was_clean {
// No dependencies have changed: we can complete the Node without changing its
// previous_result or generation.
context2
.graph()
.complete(&context2, entry_id, run_token, None);
future::ok(()).to_boxed()
} else {
// The Node needs to (re-)run!
let context = context2.clone();
node
.run(context2)
.then(move |res| {
context
.graph()
.complete(&context, entry_id, run_token, Some(res));
Ok(())
})
.to_boxed()
}
})
}));

Expand Down Expand Up @@ -262,12 +300,13 @@ impl<N: Node> Entry<N> {
run_token,
generation,
previous_result,
} => Self::start(
} => Self::run(
context,
&self.node,
entry_id,
run_token,
generation,
None,
previous_result,
),
EntryState::Completed {
Expand All @@ -283,16 +322,15 @@ impl<N: Node> Entry<N> {
result
);
// The Node has already completed but is now marked dirty. This indicates that we are the
// first caller to request it since it was marked dirty. We must transition it back to
// running.
//
// TODO: clean!
Self::start(
// first caller to request it since it was marked dirty. We attempt to clean it (which will
// cause it to re-run if the dep_generations mismatch).
Self::run(
context,
&self.node,
entry_id,
run_token,
generation,
Some(dep_generations),
Some(result),
)
}
Expand Down Expand Up @@ -361,12 +399,13 @@ impl<N: Node> Entry<N> {
if dirty {
// The node was dirtied while it was running. The dep_generations and new result cannot
// be trusted and were never published. We continue to use the previous result.
Self::start(
Self::run(
context,
&self.node,
entry_id,
run_token,
generation,
None,
previous_result,
)
} else {
Expand Down Expand Up @@ -901,6 +940,40 @@ impl<N: Node> Graph<N> {
}
}

///
/// Gets the generations of the dependencies of the given EntryId, (re)computing or cleaning
/// them first if necessary.
///
fn dep_generations<C>(
&self,
entry_id: EntryId,
context: &C,
) -> BoxFuture<Vec<Generation>, N::Error>
where
C: NodeContext<Node = N>,
{
let mut inner = self.inner.lock().unwrap();
let dep_ids = inner
.pg
.neighbors_directed(entry_id, Direction::Outgoing)
.collect::<Vec<_>>();

future::join_all(
dep_ids
.into_iter()
.map(|dep_id| {
let entry = inner
.entry_for_id_mut(dep_id)
.unwrap_or_else(|| panic!("Dependency not present in Graph."));
entry
.get(context, dep_id)
.map(|(_, generation)| generation)
.to_boxed()
})
.collect::<Vec<_>>(),
).to_boxed()
}

///
/// When the Executor finishes executing a Node it calls back to store the result value. We use
/// the run_token and dirty bits to determine whether the Node changed while we were busy
Expand Down Expand Up @@ -1101,10 +1174,38 @@ mod tests {
graph.create(TNode(2), &context).wait(),
Ok(vec![T(0, 0), T(1, 0), T(2, 0)])
);
assert_eq!(context.runs(), vec![TNode(2), TNode(1), TNode(0), TNode(1)]);
}

#[test]
fn invalidate_and_rerun() {
let graph = Arc::new(Graph::new());
let context0 = TContext::new(0, graph.clone());

// Create three nodes.
assert_eq!(
graph.create(TNode(2), &context0).wait(),
Ok(vec![T(0, 0), T(1, 0), T(2, 0)])
);
assert_eq!(context0.runs(), vec![TNode(2), TNode(1), TNode(0)]);

// Clear the middle Node, which dirties the upper node.
assert_eq!(
graph.invalidate_from_roots(|&TNode(n)| n == 1),
InvalidationResult {
cleared: 1,
dirtied: 1
}
);

// Request with a new context, which will cause both the middle and upper nodes to rerun since
// their input values have changed.
let context1 = TContext::new(1, graph.clone());
assert_eq!(
context.runs(),
vec![TNode(2), TNode(1), TNode(0), TNode(2), TNode(1)]
graph.create(TNode(2), &context1).wait(),
Ok(vec![T(0, 0), T(1, 1), T(2, 1)])
);
assert_eq!(context1.runs(), vec![TNode(1), TNode(2)]);
}

///
Expand Down

0 comments on commit c524da8

Please sign in to comment.