Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick notify watcher series #9741

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ def ti(type_obj):
execution_options.process_execution_use_local_cache,
self.context.utf8_dict(execution_options.remote_execution_headers),
execution_options.process_execution_local_enable_nailgun,
execution_options.experimental_fs_watcher,
)
if scheduler_result.is_throw:
value = self.context.from_value(scheduler_result.throw_handle)
Expand Down
11 changes: 11 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ExecutionOptions:
remote_execution_extra_platform_properties: Any
remote_execution_headers: Any
process_execution_local_enable_nailgun: bool
experimental_fs_watcher: bool

@classmethod
def from_bootstrap_options(cls, bootstrap_options):
Expand All @@ -135,6 +136,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_execution_extra_platform_properties=bootstrap_options.remote_execution_extra_platform_properties,
remote_execution_headers=bootstrap_options.remote_execution_headers,
process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun,
experimental_fs_watcher=bootstrap_options.experimental_fs_watcher,
)


Expand All @@ -160,6 +162,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_execution_extra_platform_properties=[],
remote_execution_headers={},
process_execution_local_enable_nailgun=False,
experimental_fs_watcher=False,
)


Expand Down Expand Up @@ -934,6 +937,14 @@ def register_bootstrap_options(cls, register):
help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.",
advanced=True,
)
register(
"--experimental-fs-watcher",
type=bool,
default=False,
advanced=True,
help="Whether to use the engine filesystem watcher which registers the workspace"
" for kernel file change events",
)

@classmethod
def register_options(cls, register):
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub extern "C" fn scheduler_create(
process_execution_use_local_cache: bool,
remote_execution_headers_buf: BufferBuffer,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> RawResult {
match make_core(
tasks_ptr,
Expand Down Expand Up @@ -236,6 +237,7 @@ pub extern "C" fn scheduler_create(
process_execution_use_local_cache,
remote_execution_headers_buf,
process_execution_local_enable_nailgun,
experimental_fs_watcher,
) {
Ok(core) => RawResult {
is_throw: false,
Expand Down Expand Up @@ -278,6 +280,7 @@ fn make_core(
process_execution_use_local_cache: bool,
remote_execution_headers_buf: BufferBuffer,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> Result<Core, String> {
let root_type_ids = root_type_ids.to_vec();
let ignore_patterns = ignore_patterns_buf
Expand Down Expand Up @@ -386,6 +389,7 @@ fn make_core(
process_execution_use_local_cache,
remote_execution_headers,
process_execution_local_enable_nailgun,
experimental_fs_watcher,
)
}

Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl Core {
process_execution_use_local_cache: bool,
remote_execution_headers: BTreeMap<String, String>,
process_execution_local_enable_nailgun: bool,
experimental_fs_watcher: bool,
) -> Result<Core, String> {
// Randomize CAS address order to avoid thundering herds from common config.
let mut remote_store_servers = remote_store_servers;
Expand Down Expand Up @@ -251,6 +252,7 @@ impl Core {
executor.clone(),
build_root.clone(),
ignorer.clone(),
experimental_fs_watcher,
)?;

Ok(Core {
Expand Down
187 changes: 96 additions & 91 deletions src/rust/engine/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct InvalidationWatcher {
watcher: Arc<Mutex<RecommendedWatcher>>,
executor: Executor,
liveness: Receiver<()>,
enabled: bool,
}

impl InvalidationWatcher {
Expand All @@ -49,6 +50,7 @@ impl InvalidationWatcher {
executor: Executor,
build_root: PathBuf,
ignorer: Arc<GitignoreStyleExcludes>,
enabled: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels to me that creating a disabled InvalidationWatcher is a weird pattern. Would it be possible to replace this logic with the caller holding an Option<InvalidationWatcher> and avoid the need for any change to this class?

) -> Result<InvalidationWatcher, String> {
// Inotify events contain canonical paths to the files being watched.
// If the build_root contains a symlink the paths returned in notify events
Expand All @@ -59,103 +61,106 @@ impl InvalidationWatcher {
let (watch_sender, watch_receiver) = crossbeam_channel::unbounded();
let mut watcher: RecommendedWatcher = Watcher::new(watch_sender, Duration::from_millis(50))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth extracting this magic 50 ms to a named constant preceeded by a comment explaining why 50 ms seemed to be an appropriate value for now and under which circumstances we expect it may be worth changing the value. Same comment applies a few lines later for the magic 100 ms

.map_err(|e| format!("Failed to begin watching the filesystem: {}", e))?;
// On darwin the notify API is much more efficient if you watch the build root
// recursively, so we set up that watch here and then return early when watch() is
// called by nodes that are running. On Linux the notify crate handles adding paths to watch
// much more efficiently so we do that instead on Linux.
if cfg!(target_os = "macos") {
watcher
.watch(canonical_build_root.clone(), RecursiveMode::Recursive)
.map_err(|e| {
format!(
"Failed to begin recursively watching files in the build root: {}",
e
)
})?
}
let wrapped_watcher = Arc::new(Mutex::new(watcher));

let (thread_liveness_sender, thread_liveness_receiver) = crossbeam_channel::unbounded();
thread::spawn(move || {
logging::set_thread_destination(logging::Destination::Pantsd);
loop {
let event_res = watch_receiver.recv_timeout(Duration::from_millis(100));
let graph = if let Some(g) = graph.upgrade() {
g
} else {
// The Graph has been dropped: we're done.
break;
};
match event_res {
Ok(Ok(ev)) => {
let paths: HashSet<_> = ev
.paths
.into_iter()
.filter_map(|path| {
// relativize paths to build root.
let path_relative_to_build_root = if path.starts_with(&canonical_build_root) {
// Unwrapping is fine because we check that the path starts with
// the build root above.
path.strip_prefix(&canonical_build_root).unwrap().into()
} else {
path
};
// To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer
// to say that no path is a directory, they could be if someone chmod's or creates a dir.
// This maintains correctness by ensuring that at worst we have false negative events, where a directory
// only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That
// directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere
// out of our purview.
if ignorer.is_ignored_or_child_of_ignored_path(
&path_relative_to_build_root,
/* is_dir */ false,
) {
None
} else {
Some(path_relative_to_build_root)
}
})
.map(|path_relative_to_build_root| {
let mut paths_to_invalidate: Vec<PathBuf> = vec![];
if let Some(parent_dir) = path_relative_to_build_root.parent() {
paths_to_invalidate.push(parent_dir.to_path_buf());
}
paths_to_invalidate.push(path_relative_to_build_root);
paths_to_invalidate
})
.flatten()
.collect();
// Only invalidate stuff if we have paths that weren't filtered out by gitignore.
if !paths.is_empty() {
debug!("notify invalidating {:?} because of {:?}", paths, ev.kind);
InvalidationWatcher::invalidate(&graph, &paths, "notify");
};
}
Ok(Err(err)) => {
if let notify::ErrorKind::PathNotFound = err.kind {
warn!("Path(s) did not exist: {:?}", err.paths);
continue;
} else {
error!("File watcher failing with: {}", err);
if enabled {
// On darwin the notify API is much more efficient if you watch the build root
// recursively, so we set up that watch here and then return early when watch() is
// called by nodes that are running. On Linux the notify crate handles adding paths to watch
// much more efficiently so we do that instead on Linux.
if cfg!(target_os = "macos") {
watcher
.watch(canonical_build_root.clone(), RecursiveMode::Recursive)
.map_err(|e| {
format!(
"Failed to begin recursively watching files in the build root: {}",
e
)
})?
}

thread::spawn(move || {
logging::set_thread_destination(logging::Destination::Pantsd);
loop {
let event_res = watch_receiver.recv_timeout(Duration::from_millis(100));
let graph = if let Some(g) = graph.upgrade() {
g
} else {
// The Graph has been dropped: we're done.
break;
};
match event_res {
Ok(Ok(ev)) => {
let paths: HashSet<_> = ev
.paths
.into_iter()
.filter_map(|path| {
// relativize paths to build root.
let path_relative_to_build_root = if path.starts_with(&canonical_build_root) {
// Unwrapping is fine because we check that the path starts with
// the build root above.
path.strip_prefix(&canonical_build_root).unwrap().into()
} else {
path
};
// To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer
// to say that no path is a directory, they could be if someone chmod's or creates a dir.
// This maintains correctness by ensuring that at worst we have false negative events, where a directory
// only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That
// directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere
// out of our purview.
if ignorer.is_ignored_or_child_of_ignored_path(
&path_relative_to_build_root,
/* is_dir */ false,
) {
None
} else {
Some(path_relative_to_build_root)
}
})
.map(|path_relative_to_build_root| {
let mut paths_to_invalidate: Vec<PathBuf> = vec![];
if let Some(parent_dir) = path_relative_to_build_root.parent() {
paths_to_invalidate.push(parent_dir.to_path_buf());
}
paths_to_invalidate.push(path_relative_to_build_root);
paths_to_invalidate
})
.flatten()
.collect();
// Only invalidate stuff if we have paths that weren't filtered out by gitignore.
if !paths.is_empty() {
debug!("notify invalidating {:?} because of {:?}", paths, ev.kind);
InvalidationWatcher::invalidate(&graph, &paths, "notify");
};
}
Ok(Err(err)) => {
if let notify::ErrorKind::PathNotFound = err.kind {
warn!("Path(s) did not exist: {:?}", err.paths);
continue;
} else {
error!("File watcher failing with: {}", err);
break;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
// The Watcher is gone: we're done.
break;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
// The Watcher is gone: we're done.
break;
}
};
}
debug!("Watch thread exiting.");
// Signal that we're exiting (which we would also do by just dropping the channel).
let _ = thread_liveness_sender.send(());
});
};
}
debug!("Watch thread exiting.");
// Signal that we're exiting (which we would also do by just dropping the channel).
let _ = thread_liveness_sender.send(());
});
};

Ok(InvalidationWatcher {
watcher: wrapped_watcher,
watcher: Arc::new(Mutex::new(watcher)),
executor,
liveness: thread_liveness_receiver,
enabled,
})
}

Expand All @@ -164,8 +169,8 @@ impl InvalidationWatcher {
///
pub async fn watch(&self, path: PathBuf) -> Result<(), notify::Error> {
// Short circuit here if we are on a Darwin platform because we should be watching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment on updating the commit msg to MacOS

// the entire build root recursively already.
if cfg!(target_os = "macos") {
// the entire build root recursively already, or if we are not enabled.
if cfg!(target_os = "macos") || !self.enabled {
Ok(())
} else {
// Using a futurized mutex here because for some reason using a regular mutex
Expand Down
10 changes: 8 additions & 2 deletions src/rust/engine/src/watch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,14 @@ fn setup_watch(
) -> InvalidationWatcher {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let executor = Executor::new(rt.handle().clone());
let watcher = InvalidationWatcher::new(Arc::downgrade(&graph), executor, build_root, ignorer)
.expect("Couldn't create InvalidationWatcher");
let watcher = InvalidationWatcher::new(
Arc::downgrade(&graph),
executor,
build_root,
ignorer,
/*enabled*/ true,
)
.expect("Couldn't create InvalidationWatcher");
rt.block_on(watcher.watch(file_path)).unwrap();
watcher
}
Expand Down
10 changes: 8 additions & 2 deletions tests/python/pants_test/engine/scheduler_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

import os
import shutil
from dataclasses import asdict

from pants.base.file_system_project_tree import FileSystemProjectTree
from pants.engine.nodes import Throw
from pants.engine.scheduler import Scheduler
from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS
from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS, ExecutionOptions
from pants.testutil.engine.util import init_native
from pants.util.contextutil import temporary_file_path
from pants.util.dirutil import safe_mkdtemp, safe_rmtree
Expand Down Expand Up @@ -48,19 +49,24 @@ def mk_scheduler(
work_dir=None,
include_trace_on_error=True,
should_report_workunits=False,
execution_options=None,
):
"""Creates a SchedulerSession for a Scheduler with the given Rules installed."""
rules = rules or []
work_dir = work_dir or self._create_work_dir()
project_tree = project_tree or self.mk_fs_tree(work_dir=work_dir)
local_store_dir = os.path.realpath(safe_mkdtemp())
if execution_options is not None:
eo = asdict(DEFAULT_EXECUTION_OPTIONS)
eo.update(execution_options)
execution_options = ExecutionOptions(**eo)
scheduler = Scheduler(
self._native,
project_tree,
local_store_dir,
rules,
union_rules,
DEFAULT_EXECUTION_OPTIONS,
execution_options=execution_options or DEFAULT_EXECUTION_OPTIONS,
include_trace_on_error=include_trace_on_error,
)
return scheduler.new_session(
Expand Down
Loading