Skip to content

Commit

Permalink
Add notify fs watcher to engine. (#9318)
Browse files Browse the repository at this point in the history
* Use the notify crate to implement an `InvalidationWatcher` for Graph operations.

* Make watch async, and watcher log to pantsd.log.
Relativize paths returned from notify to the build_root.
Refactor invalidate method to be an associated method on the
InvalidationWatcher.

* Respond to feedback.
* Use spawn on io pool instead of custom future impl
* Write python fs tests
* Relativize paths to invalidate to build root
* invalidate nodes with parent paths.
* Comments

* Add rust tests.
Make some things public so we can use them in tests.
Use canonical path to build root for relativizing changed paths.

* Refactor Python tests.
Return watch errors as core::Failure all the way to user.
Move task executor onto invalidation watcher.
Move test_support trait impl into test_support mod.

* use futures lock on watcher

* Platform specific watching behavior. On Darwin recursively watch the
build root at startup. On Linux watch individual directory roots.

Co-authored-by: Stu Hood <stuhood@gmail.com>
  • Loading branch information
Henry Fuller and stuhood authored Mar 26, 2020
1 parent da4e415 commit 6db9ab4
Show file tree
Hide file tree
Showing 15 changed files with 959 additions and 403 deletions.
2 changes: 2 additions & 0 deletions pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pants_ignore.add = [
"/build-support/*.venv/",
# An absolute symlink to the Pants Rust toolchain sources.
"/build-support/bin/native/src",
# We shouldn't walk or watch the rust compiler artifacts because it is slow.
"/src/rust/engine/target",
]

[cache]
Expand Down
813 changes: 458 additions & 355 deletions src/rust/engine/Cargo.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,17 @@ default-members = [
]

[dependencies]
async_semaphore = { path = "async_semaphore" }
async-trait = "0.1"
boxfuture = { path = "boxfuture" }
bytes = "0.4.5"
concrete_time = { path = "concrete_time" }
crossbeam-channel = "0.3"
fnv = "1.0.5"
fs = { path = "fs" }
futures01 = { package = "futures", version = "0.1" }
futures = { version = "0.3", features = ["compat"] }
futures-locks = "0.3.0"
graph = { path = "graph" }
hashing = { path = "hashing" }
indexmap = "1.0.2"
Expand All @@ -98,6 +101,11 @@ log = "0.4"
logging = { path = "logging" }
num_cpus = "1"
num_enum = "0.4"
# notify is currently an experimental API, we are pinning to https://docs.rs/notify/5.0.0-pre.1/notify/
# because the latest prerelease at time of writing has removed the debounced watcher which we would like to use.
# The author suggests they will add the debounced watcher back into the stable 5.0.0 release. When that happens
# we can move to it.
notify = { git = "https://github.com/notify-rs/notify", rev = "fba00891d9105e2f581c69fbe415a58cb7966fdd" }
parking_lot = "0.6"
process_execution = { path = "process_execution" }
rand = "0.6"
Expand All @@ -115,6 +123,11 @@ url = "2.1"
uuid = { version = "0.7", features = ["v4"] }
workunit_store = { path = "workunit_store" }

[dev-dependencies]
testutil = { path = "./testutil" }
fs = { path = "./fs" }
env_logger = "0.5.4"

[patch.crates-io]
# TODO: Remove patch when we can upgrade to an official released version of protobuf with a fix.
# See: https://github.com/pantsbuild/pants/issues/7760 for context.
Expand Down
14 changes: 7 additions & 7 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use boxfuture::{BoxFuture, Boxable};
/// the Node was `cleared`), the work is discarded. See `Entry::complete` for more information.
///
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct RunToken(u32);
pub struct RunToken(u32);

impl RunToken {
fn initial() -> RunToken {
pub fn initial() -> RunToken {
RunToken(0)
}

Expand All @@ -40,10 +40,10 @@ impl RunToken {
/// incremented when the output of a node has changed.
///
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct Generation(u32);
pub struct Generation(u32);

impl Generation {
fn initial() -> Generation {
pub fn initial() -> Generation {
Generation(0)
}

Expand All @@ -65,7 +65,7 @@ impl Generation {
/// If the value is Clean, the consumer can simply use the value as-is.
///
#[derive(Clone, Debug)]
pub(crate) enum EntryResult<N: Node> {
pub enum EntryResult<N: Node> {
Clean(Result<N::Item, N::Error>),
Dirty(Result<N::Item, N::Error>),
Uncacheable(
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<N: Node> AsRef<Result<N::Item, N::Error>> for EntryResult<N> {

#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub(crate) enum EntryState<N: Node> {
pub enum EntryState<N: Node> {
// A node that has either been explicitly cleared, or has not yet started Running. In this state
// there is no need for a dirty bit because the RunToken is either in its initial state, or has
// been explicitly incremented when the node was cleared.
Expand Down Expand Up @@ -174,7 +174,7 @@ pub struct Entry<N: Node> {
// maps is painful.
node: N,

state: Arc<Mutex<EntryState<N>>>,
pub state: Arc<Mutex<EntryState<N>>>,
}

impl<N: Node> Entry<N> {
Expand Down
43 changes: 41 additions & 2 deletions src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use hashing;

use petgraph;

mod entry;
// make the entry module public for testing purposes. We use it to contruct mock
// graph entries in the notify watch tests.
pub mod entry;
mod node;

pub use crate::entry::Entry;
pub use crate::entry::{Entry, EntryState};
use crate::entry::{Generation, RunToken};

use std::collections::binary_heap::BinaryHeap;
Expand Down Expand Up @@ -1009,6 +1011,43 @@ impl<N: Node> Graph<N> {
}
}

// This module provides a trait which contains functions that
// should only be used in tests. A user must explicitly import the trait
// to use the extra test functions, and they should only be imported into
// test modules.
pub mod test_support {
use super::{EntryId, EntryState, Graph, Node};
pub trait TestGraph<N: Node> {
fn set_fixture_entry_state_for_id(&self, id: EntryId, state: EntryState<N>);
fn add_fixture_entry(&self, node: N) -> EntryId;
fn entry_state(&self, id: EntryId) -> &str;
}
impl<N: Node> TestGraph<N> for Graph<N> {
fn set_fixture_entry_state_for_id(&self, id: EntryId, state: EntryState<N>) {
let mut inner = self.inner.lock();
let entry = inner.entry_for_id_mut(id).unwrap();
let mut entry_state = entry.state.lock();
*entry_state = state;
}

fn add_fixture_entry(&self, node: N) -> EntryId {
let mut inner = self.inner.lock();
inner.ensure_entry(node)
}

fn entry_state(&self, id: EntryId) -> &str {
let mut inner = self.inner.lock();
let entry = inner.entry_for_id_mut(id).unwrap();
let entry_state = entry.state.lock();
match *entry_state {
EntryState::Completed { .. } => "completed",
EntryState::Running { .. } => "running",
EntryState::NotStarted { .. } => "not started",
}
}
}
}

///
/// Represents the state of a particular walk through a Graph. Implements Iterator and has the same
/// lifetime as the Graph itself.
Expand Down
12 changes: 9 additions & 3 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::nodes::{NodeKey, WrappedNode};
use crate::scheduler::Session;
use crate::tasks::{Rule, Tasks};
use crate::types::Types;
use crate::watch::InvalidationWatcher;
use boxfuture::{BoxFuture, Boxable};
use core::clone::Clone;
use fs::{safe_create_dir_all_ioerror, PosixFS};
Expand All @@ -43,7 +44,7 @@ const GIGABYTES: usize = 1024 * 1024 * 1024;
/// https://github.com/tokio-rs/tokio/issues/369 is resolved.
///
pub struct Core {
pub graph: Graph<NodeKey>,
pub graph: Arc<Graph<NodeKey>>,
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
Expand All @@ -53,6 +54,7 @@ pub struct Core {
pub command_runner: Box<dyn process_execution::CommandRunner>,
pub http_client: reqwest::Client,
pub vfs: PosixFS,
pub watcher: InvalidationWatcher,
pub build_root: PathBuf,
}

Expand Down Expand Up @@ -232,12 +234,15 @@ impl Core {
process_execution_metadata,
));
}
let graph = Arc::new(Graph::new());
let watcher =
InvalidationWatcher::new(Arc::downgrade(&graph), executor.clone(), build_root.clone())?;

let http_client = reqwest::Client::new();
let rule_graph = RuleGraph::new(tasks.as_map(), root_subject_types);

Ok(Core {
graph: Graph::new(),
graph: graph,
tasks: tasks,
rule_graph: rule_graph,
types: types,
Expand All @@ -250,7 +255,8 @@ impl Core {
// exceptions, rather than as panics.
vfs: PosixFS::new(&build_root, &ignore_patterns, executor)
.map_err(|e| format!("Could not initialize VFS: {:?}", e))?,
build_root: build_root,
build_root,
watcher,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,15 @@ pub enum Failure {
Invalidated,
/// A rule raised an exception.
Throw(Value, String),
FileWatch(String),
}

impl fmt::Display for Failure {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Failure::Invalidated => write!(f, "Exhausted retries due to changed files."),
Failure::Throw(exc, _) => write!(f, "{}", externs::val_to_str(exc)),
Failure::FileWatch(failure) => write!(f, "{}", failure),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/src/externs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ impl From<Result<Value, Failure>> for PyResult {
let val = match f {
f @ Failure::Invalidated => create_exception(&format!("{}", f)),
Failure::Throw(exc, _) => exc,
Failure::FileWatch(failure) => create_exception(&failure),
};
PyResult {
is_throw: true,
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod scheduler;
mod selectors;
mod tasks;
mod types;
mod watch;

pub use crate::context::Core;
pub use crate::core::{Function, Key, Params, TypeId, Value};
Expand All @@ -49,3 +50,6 @@ pub use crate::scheduler::{
};
pub use crate::tasks::{Rule, Tasks};
pub use crate::types::Types;

#[cfg(test)]
mod watch_tests;
41 changes: 29 additions & 12 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ impl NodeVisualizer<NodeKey> for Visualizer {
let max_colors = 12;
match entry.peek(context) {
None => "white".to_string(),
Some(Err(Failure::Throw(..))) => "4".to_string(),
Some(Err(Failure::Throw(..))) | Some(Err(Failure::FileWatch(..))) => "4".to_string(),
Some(Err(Failure::Invalidated)) => "12".to_string(),
Some(Ok(_)) => {
let viz_colors_len = self.viz_colors.len();
Expand All @@ -962,6 +962,7 @@ impl NodeTracer<NodeKey> for Tracer {
match result {
Some(Err(Failure::Invalidated)) => false,
Some(Err(Failure::Throw(..))) => false,
Some(Err(Failure::FileWatch(..))) => false,
Some(Ok(_)) => true,
None => {
// A Node with no state is either still running, or effectively cancelled
Expand All @@ -986,6 +987,7 @@ impl NodeTracer<NodeKey> for Tracer {
.join("\n")
),
Some(Err(Failure::Invalidated)) => "Invalidated".to_string(),
Some(Err(Failure::FileWatch(failure))) => format!("FileWatch failed: {}", failure),
}
}
}
Expand Down Expand Up @@ -1067,17 +1069,32 @@ impl Node for NodeKey {

scope_task_parent_id(maybe_span_id, async move {
let context2 = context.clone();
let result = match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::MultiPlatformExecuteProcess(n) => {
n.run(context).map(NodeResult::from).compat().await
}
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Select(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Task(n) => n.run(context).map(NodeResult::from).compat().await,
let maybe_watch = if let Some(path) = self.fs_subject() {
let abs_path = context.core.build_root.join(path);
context
.core
.watcher
.watch(abs_path)
.map_err(|e| Failure::FileWatch(format!("{:?}", e)))
.await
} else {
Ok(())
};

let result = match maybe_watch {
Ok(()) => match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::MultiPlatformExecuteProcess(n) => {
n.run(context).map(NodeResult::from).compat().await
}
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Select(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Task(n) => n.run(context).map(NodeResult::from).compat().await,
},
Err(e) => Err(e),
};
if let Some(started_workunit) = maybe_started_workunit {
let workunit: WorkUnit = started_workunit.finish();
Expand Down
18 changes: 2 additions & 16 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use futures01::future::{self, Future};
use crate::context::{Context, Core};
use crate::core::{Failure, Params, TypeId, Value};
use crate::nodes::{NodeKey, Select, Tracer, Visualizer};
use crate::watch::InvalidationWatcher;
use graph::{Graph, InvalidationResult};
use hashing;
use indexmap::IndexMap;
Expand Down Expand Up @@ -228,22 +229,7 @@ impl Scheduler {
/// Invalidate the invalidation roots represented by the given Paths.
///
pub fn invalidate(&self, paths: &HashSet<PathBuf>) -> usize {
let InvalidationResult { cleared, dirtied } =
self.core.graph.invalidate_from_roots(move |node| {
if let Some(fs_subject) = node.fs_subject() {
paths.contains(fs_subject)
} else {
false
}
});
// TODO: The rust log level is not currently set correctly in a pantsd context. To ensure that
// we see this even at `info` level, we set it to warn. #6004 should address this by making
// rust logging re-configuration an explicit step in `src/python/pants/init/logging.py`.
warn!(
"invalidation: cleared {} and dirtied {} nodes for: {:?}",
cleared, dirtied, paths
);
cleared + dirtied
InvalidationWatcher::invalidate(&self.core.graph, paths, "watchman")
}

///
Expand Down
Loading

0 comments on commit 6db9ab4

Please sign in to comment.