Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notify fs watcher to engine. #9318

Merged
merged 16 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ default-members = [
boxfuture = { path = "boxfuture" }
bytes = "0.4.5"
concrete_time = { path = "concrete_time" }
crossbeam-channel = "0.3"
fnv = "1.0.5"
fs = { path = "fs" }
futures01 = { package = "futures", version = "0.1" }
Expand All @@ -95,6 +96,7 @@ lazy_static = "1"
log = "0.4"
logging = { path = "logging" }
num_enum = "0.4"
notify = { git = "https://github.com/notify-rs/notify", rev = "fba00891d9105e2f581c69fbe415a58cb7966fdd" }
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
parking_lot = "0.6"
process_execution = { path = "process_execution" }
rand = "0.6"
Expand Down
17 changes: 11 additions & 6 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::nodes::{NodeKey, WrappedNode};
use crate::scheduler::Session;
use crate::tasks::{Rule, Tasks};
use crate::types::Types;
use crate::watch::InvalidationWatcher;
use boxfuture::{BoxFuture, Boxable};
use core::clone::Clone;
use fs::{safe_create_dir_all_ioerror, PosixFS};
Expand All @@ -41,7 +42,7 @@ const GIGABYTES: usize = 1024 * 1024 * 1024;
/// https://github.com/tokio-rs/tokio/issues/369 is resolved.
///
pub struct Core {
pub graph: Graph<NodeKey>,
pub graph: Arc<Graph<NodeKey>>,
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
Expand All @@ -50,6 +51,7 @@ pub struct Core {
pub command_runner: Box<dyn process_execution::CommandRunner>,
pub http_client: reqwest::r#async::Client,
pub vfs: PosixFS,
pub watcher: InvalidationWatcher,
pub build_root: PathBuf,
}

Expand Down Expand Up @@ -218,24 +220,27 @@ impl Core {
process_execution_metadata,
));
}
let graph = Arc::new(Graph::new());
let watcher = InvalidationWatcher::new(Arc::downgrade(&graph))?;

let http_client = reqwest::r#async::Client::new();
let rule_graph = RuleGraph::new(tasks.as_map(), root_subject_types);

Ok(Core {
graph: Graph::new(),
tasks: tasks,
rule_graph: rule_graph,
types: types,
graph,
tasks,
rule_graph,
types,
executor: executor.clone(),
store,
command_runner,
http_client,
watcher,
// TODO: Errors in initialization should definitely be exposed as python
// exceptions, rather than as panics.
vfs: PosixFS::new(&build_root, &ignore_patterns, executor)
.map_err(|e| format!("Could not initialize VFS: {:?}", e))?,
build_root: build_root,
build_root,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod scheduler;
mod selectors;
mod tasks;
mod types;
mod watch;

pub use crate::context::Core;
pub use crate::core::{Function, Key, Params, TypeId, Value};
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,10 @@ impl Node for NodeKey {
type Error = Failure;

fn run(self, context: Context) -> NodeFuture<NodeResult> {
if let Some(path) = self.fs_subject() {
let abs_path = context.core.build_root.join(path);
try_future!(context.core.watcher.watch(abs_path).map_err(|e| throw(&e)));
}
let (maybe_started_workunit, maybe_span_id) = if context.session.should_handle_workunits() {
let user_facing_name = self.user_facing_name();
let span_id = generate_random_64bit_string();
Expand Down
116 changes: 116 additions & 0 deletions src/rust/engine/src/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::HashSet;
use std::path::Path;
use std::sync::Weak;
use std::thread;
use std::time::Duration;

use crossbeam_channel::{self, Receiver, RecvTimeoutError, TryRecvError};
use log::{error, warn};
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use parking_lot::Mutex;

use graph::Graph;
use logging;

use crate::nodes::NodeKey;

///
/// An InvalidationWatcher maintains a Thread that receives events from a notify Watcher.
///
/// If the spawned Thread exits for any reason, InvalidationWatcher::running() will return False,
/// and the caller should create a new InvalidationWatcher (or shut down, in some cases). Generally
/// this will mean polling.
///
/// TODO: Need the above polling, and need to make the watch method async.
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
///
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
pub struct InvalidationWatcher {
watcher: Mutex<RecommendedWatcher>,
liveness: Receiver<()>,
}

impl InvalidationWatcher {
pub fn new(graph: Weak<Graph<NodeKey>>) -> Result<InvalidationWatcher, String> {
let logging_destination = logging::get_destination();
let (watch_sender, watch_receiver) = crossbeam_channel::unbounded();
let watcher = Mutex::new(
Watcher::new(watch_sender, Duration::from_millis(50))
.map_err(|e| format!("Failed to begin watching the filesystem: {}", e))?,
);

let (thread_liveness_sender, thread_liveness_receiver) = crossbeam_channel::unbounded();
thread::spawn(move || {
logging::set_destination(logging_destination);
loop {
let event_res = watch_receiver.recv_timeout(Duration::from_millis(100));
let graph = if let Some(g) = graph.upgrade() {
g
} else {
// The Graph has been dropped: we're done.
break;
};
match event_res {
Ok(Ok(ev)) => invalidate(&graph, ev),
Ok(Err(err)) => {
if let notify::ErrorKind::PathNotFound = err.kind {
warn!("Path(s) did not exist: {:?}", err.paths);
continue;
} else {
error!("File watcher failing with: {}", err);
break;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
// The Watcher is gone: we're done.
break;
}
};
}
warn!("Watch thread exiting.");
// Signal that we're exiting (which we would also do by just dropping the channel).
let _ = thread_liveness_sender.send(());
});

Ok(InvalidationWatcher {
watcher,
liveness: thread_liveness_receiver,
})
}

///
/// Watch the given path non-recursively.
///
pub fn watch<P: AsRef<Path> + std::fmt::Debug>(&self, path: P) -> Result<(), String> {
// warn!("watching {:?}", path);
let mut watcher = self.watcher.lock();
watcher
.watch(&path, RecursiveMode::NonRecursive)
.map_err(|e| format!("Failed to begin watching `{:?}`: {}", path, e))
}

///
/// Returns true if this InvalidationWatcher is still valid: if it is not valid, it will have
/// already logged some sort of error, and will never restart on its own.
///
pub fn running(&self) -> bool {
match self.liveness.try_recv() {
Ok(()) | Err(TryRecvError::Disconnected) => false,
Err(TryRecvError::Empty) => true,
}
}
}

fn invalidate(graph: &Graph<NodeKey>, ev: Event) {
let paths: HashSet<_> = ev.paths.into_iter().collect();
warn!("notify invalidating {:?}", paths);
graph.invalidate_from_roots(move |node| {
if let Some(fs_subject) = node.fs_subject() {
paths.contains(fs_subject)
} else {
false
}
});
}