Skip to content

Commit

Permalink
Add a feature gate to disable the engine fs watcher introduced in pan…
Browse files Browse the repository at this point in the history
…tsbuild#9318 (pantsbuild#9416)

* Add a feature gate to disable the engine fs watcher introduced in pantsbuild#9318
by default, to mitigate issues seen in pantsbuild#9415 until a fix is in place.
  • Loading branch information
Henry Fuller committed May 6, 2020
1 parent fd5c22a commit b2bab03
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 98 deletions.
1 change: 1 addition & 0 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ def ti(type_obj):
execution_options.process_execution_use_local_cache,
self.context.utf8_dict(execution_options.remote_execution_headers),
execution_options.process_execution_local_enable_nailgun,
execution_options.experimental_fs_watcher,
)
if scheduler_result.is_throw:
value = self.context.from_value(scheduler_result.throw_handle)
Expand Down
11 changes: 11 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ExecutionOptions:
remote_execution_extra_platform_properties: Any
remote_execution_headers: Any
process_execution_local_enable_nailgun: bool
experimental_fs_watcher: bool

@classmethod
def from_bootstrap_options(cls, bootstrap_options):
Expand All @@ -135,6 +136,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_execution_extra_platform_properties=bootstrap_options.remote_execution_extra_platform_properties,
remote_execution_headers=bootstrap_options.remote_execution_headers,
process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun,
experimental_fs_watcher=bootstrap_options.experimental_fs_watcher,
)


Expand All @@ -160,6 +162,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_execution_extra_platform_properties=[],
remote_execution_headers={},
process_execution_local_enable_nailgun=False,
experimental_fs_watcher=False,
)


Expand Down Expand Up @@ -934,6 +937,14 @@ def register_bootstrap_options(cls, register):
help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.",
advanced=True,
)
register(
"--experimental-fs-watcher",
type=bool,
default=False,
advanced=True,
help="Whether to use the engine filesystem watcher which registers the workspace"
" for kernel file change events",
)

@classmethod
def register_options(cls, register):
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub extern "C" fn scheduler_create(
process_execution_use_local_cache: bool,
remote_execution_headers_buf: BufferBuffer,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> RawResult {
match make_core(
tasks_ptr,
Expand Down Expand Up @@ -236,6 +237,7 @@ pub extern "C" fn scheduler_create(
process_execution_use_local_cache,
remote_execution_headers_buf,
process_execution_local_enable_nailgun,
experimental_fs_watcher,
) {
Ok(core) => RawResult {
is_throw: false,
Expand Down Expand Up @@ -278,6 +280,7 @@ fn make_core(
process_execution_use_local_cache: bool,
remote_execution_headers_buf: BufferBuffer,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> Result<Core, String> {
let root_type_ids = root_type_ids.to_vec();
let ignore_patterns = ignore_patterns_buf
Expand Down Expand Up @@ -386,6 +389,7 @@ fn make_core(
process_execution_use_local_cache,
remote_execution_headers,
process_execution_local_enable_nailgun,
experimental_fs_watcher,
)
}

Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl Core {
process_execution_use_local_cache: bool,
remote_execution_headers: BTreeMap<String, String>,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> Result<Core, String> {
// Randomize CAS address order to avoid thundering herds from common config.
let mut remote_store_servers = remote_store_servers;
Expand Down Expand Up @@ -251,6 +252,7 @@ impl Core {
executor.clone(),
build_root.clone(),
ignorer.clone(),
experimental_fs_watcher,
)?;

Ok(Core {
Expand Down
187 changes: 96 additions & 91 deletions src/rust/engine/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct InvalidationWatcher {
watcher: Arc<Mutex<RecommendedWatcher>>,
executor: Executor,
liveness: Receiver<()>,
enabled: bool,
}

impl InvalidationWatcher {
Expand All @@ -49,6 +50,7 @@ impl InvalidationWatcher {
executor: Executor,
build_root: PathBuf,
ignorer: Arc<GitignoreStyleExcludes>,
enabled: bool,
) -> Result<InvalidationWatcher, String> {
// Inotify events contain canonical paths to the files being watched.
// If the build_root contains a symlink the paths returned in notify events
Expand All @@ -59,103 +61,106 @@ impl InvalidationWatcher {
let (watch_sender, watch_receiver) = crossbeam_channel::unbounded();
let mut watcher: RecommendedWatcher = Watcher::new(watch_sender, Duration::from_millis(50))
.map_err(|e| format!("Failed to begin watching the filesystem: {}", e))?;
// On darwin the notify API is much more efficient if you watch the build root
// recursively, so we set up that watch here and then return early when watch() is
// called by nodes that are running. On Linux the notify crate handles adding paths to watch
// much more efficiently so we do that instead on Linux.
if cfg!(target_os = "macos") {
watcher
.watch(canonical_build_root.clone(), RecursiveMode::Recursive)
.map_err(|e| {
format!(
"Failed to begin recursively watching files in the build root: {}",
e
)
})?
}
let wrapped_watcher = Arc::new(Mutex::new(watcher));

let (thread_liveness_sender, thread_liveness_receiver) = crossbeam_channel::unbounded();
thread::spawn(move || {
logging::set_thread_destination(logging::Destination::Pantsd);
loop {
let event_res = watch_receiver.recv_timeout(Duration::from_millis(100));
let graph = if let Some(g) = graph.upgrade() {
g
} else {
// The Graph has been dropped: we're done.
break;
};
match event_res {
Ok(Ok(ev)) => {
let paths: HashSet<_> = ev
.paths
.into_iter()
.filter_map(|path| {
// relativize paths to build root.
let path_relative_to_build_root = if path.starts_with(&canonical_build_root) {
// Unwrapping is fine because we check that the path starts with
// the build root above.
path.strip_prefix(&canonical_build_root).unwrap().into()
} else {
path
};
// To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer
// to say that no path is a directory, they could be if someone chmod's or creates a dir.
// This maintains correctness by ensuring that at worst we have false negative events, where a directory
// only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That
// directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere
// out of our purview.
if ignorer.is_ignored_or_child_of_ignored_path(
&path_relative_to_build_root,
/* is_dir */ false,
) {
None
} else {
Some(path_relative_to_build_root)
}
})
.map(|path_relative_to_build_root| {
let mut paths_to_invalidate: Vec<PathBuf> = vec![];
if let Some(parent_dir) = path_relative_to_build_root.parent() {
paths_to_invalidate.push(parent_dir.to_path_buf());
}
paths_to_invalidate.push(path_relative_to_build_root);
paths_to_invalidate
})
.flatten()
.collect();
// Only invalidate stuff if we have paths that weren't filtered out by gitignore.
if !paths.is_empty() {
debug!("notify invalidating {:?} because of {:?}", paths, ev.kind);
InvalidationWatcher::invalidate(&graph, &paths, "notify");
};
}
Ok(Err(err)) => {
if let notify::ErrorKind::PathNotFound = err.kind {
warn!("Path(s) did not exist: {:?}", err.paths);
continue;
} else {
error!("File watcher failing with: {}", err);
if enabled {
// On darwin the notify API is much more efficient if you watch the build root
// recursively, so we set up that watch here and then return early when watch() is
// called by nodes that are running. On Linux the notify crate handles adding paths to watch
// much more efficiently so we do that instead on Linux.
if cfg!(target_os = "macos") {
watcher
.watch(canonical_build_root.clone(), RecursiveMode::Recursive)
.map_err(|e| {
format!(
"Failed to begin recursively watching files in the build root: {}",
e
)
})?
}

thread::spawn(move || {
logging::set_thread_destination(logging::Destination::Pantsd);
loop {
let event_res = watch_receiver.recv_timeout(Duration::from_millis(100));
let graph = if let Some(g) = graph.upgrade() {
g
} else {
// The Graph has been dropped: we're done.
break;
};
match event_res {
Ok(Ok(ev)) => {
let paths: HashSet<_> = ev
.paths
.into_iter()
.filter_map(|path| {
// relativize paths to build root.
let path_relative_to_build_root = if path.starts_with(&canonical_build_root) {
// Unwrapping is fine because we check that the path starts with
// the build root above.
path.strip_prefix(&canonical_build_root).unwrap().into()
} else {
path
};
// To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer
// to say that no path is a directory, they could be if someone chmod's or creates a dir.
// This maintains correctness by ensuring that at worst we have false negative events, where a directory
// only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That
// directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere
// out of our purview.
if ignorer.is_ignored_or_child_of_ignored_path(
&path_relative_to_build_root,
/* is_dir */ false,
) {
None
} else {
Some(path_relative_to_build_root)
}
})
.map(|path_relative_to_build_root| {
let mut paths_to_invalidate: Vec<PathBuf> = vec![];
if let Some(parent_dir) = path_relative_to_build_root.parent() {
paths_to_invalidate.push(parent_dir.to_path_buf());
}
paths_to_invalidate.push(path_relative_to_build_root);
paths_to_invalidate
})
.flatten()
.collect();
// Only invalidate stuff if we have paths that weren't filtered out by gitignore.
if !paths.is_empty() {
debug!("notify invalidating {:?} because of {:?}", paths, ev.kind);
InvalidationWatcher::invalidate(&graph, &paths, "notify");
};
}
Ok(Err(err)) => {
if let notify::ErrorKind::PathNotFound = err.kind {
warn!("Path(s) did not exist: {:?}", err.paths);
continue;
} else {
error!("File watcher failing with: {}", err);
break;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
// The Watcher is gone: we're done.
break;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
// The Watcher is gone: we're done.
break;
}
};
}
debug!("Watch thread exiting.");
// Signal that we're exiting (which we would also do by just dropping the channel).
let _ = thread_liveness_sender.send(());
});
};
}
debug!("Watch thread exiting.");
// Signal that we're exiting (which we would also do by just dropping the channel).
let _ = thread_liveness_sender.send(());
});
};

Ok(InvalidationWatcher {
watcher: wrapped_watcher,
watcher: Arc::new(Mutex::new(watcher)),
executor,
liveness: thread_liveness_receiver,
enabled,
})
}

Expand All @@ -164,8 +169,8 @@ impl InvalidationWatcher {
///
pub async fn watch(&self, path: PathBuf) -> Result<(), notify::Error> {
// Short circuit here if we are on a Darwin platform because we should be watching
// the entire build root recursively already.
if cfg!(target_os = "macos") {
// the entire build root recursively already, or if we are not enabled.
if cfg!(target_os = "macos") || !self.enabled {
Ok(())
} else {
// Using a futurized mutex here because for some reason using a regular mutex
Expand Down
10 changes: 8 additions & 2 deletions src/rust/engine/src/watch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,14 @@ fn setup_watch(
) -> InvalidationWatcher {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let executor = Executor::new(rt.handle().clone());
let watcher = InvalidationWatcher::new(Arc::downgrade(&graph), executor, build_root, ignorer)
.expect("Couldn't create InvalidationWatcher");
let watcher = InvalidationWatcher::new(
Arc::downgrade(&graph),
executor,
build_root,
ignorer,
/*enabled*/ true,
)
.expect("Couldn't create InvalidationWatcher");
rt.block_on(watcher.watch(file_path)).unwrap();
watcher
}
Expand Down
10 changes: 8 additions & 2 deletions tests/python/pants_test/engine/scheduler_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

import os
import shutil
from dataclasses import asdict

from pants.base.file_system_project_tree import FileSystemProjectTree
from pants.engine.nodes import Throw
from pants.engine.scheduler import Scheduler
from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS
from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS, ExecutionOptions
from pants.testutil.engine.util import init_native
from pants.util.contextutil import temporary_file_path
from pants.util.dirutil import safe_mkdtemp, safe_rmtree
Expand Down Expand Up @@ -48,19 +49,24 @@ def mk_scheduler(
work_dir=None,
include_trace_on_error=True,
should_report_workunits=False,
execution_options=None,
):
"""Creates a SchedulerSession for a Scheduler with the given Rules installed."""
rules = rules or []
work_dir = work_dir or self._create_work_dir()
project_tree = project_tree or self.mk_fs_tree(work_dir=work_dir)
local_store_dir = os.path.realpath(safe_mkdtemp())
if execution_options is not None:
eo = asdict(DEFAULT_EXECUTION_OPTIONS)
eo.update(execution_options)
execution_options = ExecutionOptions(**eo)
scheduler = Scheduler(
self._native,
project_tree,
local_store_dir,
rules,
union_rules,
DEFAULT_EXECUTION_OPTIONS,
execution_options=execution_options or DEFAULT_EXECUTION_OPTIONS,
include_trace_on_error=include_trace_on_error,
)
return scheduler.new_session(
Expand Down
Loading

0 comments on commit b2bab03

Please sign in to comment.