From b2bab03a2d768601438dbf4f901a4002f25f76fc Mon Sep 17 00:00:00 2001 From: Henry Fuller Date: Sun, 29 Mar 2020 21:42:19 -0700 Subject: [PATCH] Add a feature gate to disable the engine fs watcher introduced in #9318 (#9416) * Add a feature gate to disable the engine fs watcher introduced in #9318 by default, to mitigate issues seen in #9415 until a fix is in place. --- src/python/pants/engine/native.py | 1 + src/python/pants/option/global_options.py | 11 ++ src/rust/engine/engine_cffi/src/lib.rs | 4 + src/rust/engine/src/context.rs | 2 + src/rust/engine/src/watch.rs | 187 +++++++++--------- src/rust/engine/src/watch_tests.rs | 10 +- .../pants_test/engine/scheduler_test_base.py | 10 +- tests/python/pants_test/engine/test_fs.py | 18 +- 8 files changed, 145 insertions(+), 98 deletions(-) diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 6914c0684d2..30b32a31081 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -999,6 +999,7 @@ def ti(type_obj): execution_options.process_execution_use_local_cache, self.context.utf8_dict(execution_options.remote_execution_headers), execution_options.process_execution_local_enable_nailgun, + execution_options.experimental_fs_watcher, ) if scheduler_result.is_throw: value = self.context.from_value(scheduler_result.throw_handle) diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index fff2d18bdaa..a3079b214ca 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -110,6 +110,7 @@ class ExecutionOptions: remote_execution_extra_platform_properties: Any remote_execution_headers: Any process_execution_local_enable_nailgun: bool + experimental_fs_watcher: bool @classmethod def from_bootstrap_options(cls, bootstrap_options): @@ -135,6 +136,7 @@ def from_bootstrap_options(cls, bootstrap_options): remote_execution_extra_platform_properties=bootstrap_options.remote_execution_extra_platform_properties, remote_execution_headers=bootstrap_options.remote_execution_headers, process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun, + experimental_fs_watcher=bootstrap_options.experimental_fs_watcher, ) @@ -160,6 +162,7 @@ def from_bootstrap_options(cls, bootstrap_options): remote_execution_extra_platform_properties=[], remote_execution_headers={}, process_execution_local_enable_nailgun=False, + experimental_fs_watcher=False, ) @@ -934,6 +937,14 @@ def register_bootstrap_options(cls, register): help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.", advanced=True, ) + register( + "--experimental-fs-watcher", + type=bool, + default=False, + advanced=True, + help="Whether to use the engine filesystem watcher which registers the workspace" + " for kernel file change events", + ) @classmethod def register_options(cls, register): diff --git a/src/rust/engine/engine_cffi/src/lib.rs b/src/rust/engine/engine_cffi/src/lib.rs index 56425fc0472..e6bfaa8643b 100644 --- a/src/rust/engine/engine_cffi/src/lib.rs +++ b/src/rust/engine/engine_cffi/src/lib.rs @@ -207,6 +207,7 @@ pub extern "C" fn scheduler_create( process_execution_use_local_cache: bool, remote_execution_headers_buf: BufferBuffer, process_execution_local_enable_nailgun: bool, + experimental_fs_watcher: bool, ) -> RawResult { match make_core( tasks_ptr, @@ -236,6 +237,7 @@ pub extern "C" fn scheduler_create( process_execution_use_local_cache, remote_execution_headers_buf, process_execution_local_enable_nailgun, + experimental_fs_watcher, ) { Ok(core) => RawResult { is_throw: false, @@ -278,6 +280,7 @@ fn make_core( process_execution_use_local_cache: bool, remote_execution_headers_buf: BufferBuffer, process_execution_local_enable_nailgun: bool, + experimental_fs_watcher: bool, ) -> Result { let root_type_ids = root_type_ids.to_vec(); let ignore_patterns = ignore_patterns_buf @@ -386,6 +389,7 @@ fn make_core( process_execution_use_local_cache, remote_execution_headers, process_execution_local_enable_nailgun, + experimental_fs_watcher, ) } diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index db0e72c0034..10ebe3affd2 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -87,6 +87,7 @@ impl Core { process_execution_use_local_cache: bool, remote_execution_headers: BTreeMap, process_execution_local_enable_nailgun: bool, + experimental_fs_watcher: bool, ) -> Result { // Randomize CAS address order to avoid thundering herds from common config. let mut remote_store_servers = remote_store_servers; @@ -251,6 +252,7 @@ impl Core { executor.clone(), build_root.clone(), ignorer.clone(), + experimental_fs_watcher, )?; Ok(Core { diff --git a/src/rust/engine/src/watch.rs b/src/rust/engine/src/watch.rs index a2278178795..2a5d8b106ef 100644 --- a/src/rust/engine/src/watch.rs +++ b/src/rust/engine/src/watch.rs @@ -41,6 +41,7 @@ pub struct InvalidationWatcher { watcher: Arc>, executor: Executor, liveness: Receiver<()>, + enabled: bool, } impl InvalidationWatcher { @@ -49,6 +50,7 @@ impl InvalidationWatcher { executor: Executor, build_root: PathBuf, ignorer: Arc, + enabled: bool, ) -> Result { // Inotify events contain canonical paths to the files being watched. // If the build_root contains a symlink the paths returned in notify events @@ -59,103 +61,106 @@ impl InvalidationWatcher { let (watch_sender, watch_receiver) = crossbeam_channel::unbounded(); let mut watcher: RecommendedWatcher = Watcher::new(watch_sender, Duration::from_millis(50)) .map_err(|e| format!("Failed to begin watching the filesystem: {}", e))?; - // On darwin the notify API is much more efficient if you watch the build root - // recursively, so we set up that watch here and then return early when watch() is - // called by nodes that are running. On Linux the notify crate handles adding paths to watch - // much more efficiently so we do that instead on Linux. - if cfg!(target_os = "macos") { - watcher - .watch(canonical_build_root.clone(), RecursiveMode::Recursive) - .map_err(|e| { - format!( - "Failed to begin recursively watching files in the build root: {}", - e - ) - })? - } - let wrapped_watcher = Arc::new(Mutex::new(watcher)); let (thread_liveness_sender, thread_liveness_receiver) = crossbeam_channel::unbounded(); - thread::spawn(move || { - logging::set_thread_destination(logging::Destination::Pantsd); - loop { - let event_res = watch_receiver.recv_timeout(Duration::from_millis(100)); - let graph = if let Some(g) = graph.upgrade() { - g - } else { - // The Graph has been dropped: we're done. - break; - }; - match event_res { - Ok(Ok(ev)) => { - let paths: HashSet<_> = ev - .paths - .into_iter() - .filter_map(|path| { - // relativize paths to build root. - let path_relative_to_build_root = if path.starts_with(&canonical_build_root) { - // Unwrapping is fine because we check that the path starts with - // the build root above. - path.strip_prefix(&canonical_build_root).unwrap().into() - } else { - path - }; - // To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer - // to say that no path is a directory, they could be if someone chmod's or creates a dir. - // This maintains correctness by ensuring that at worst we have false negative events, where a directory - // only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That - // directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere - // out of our purview. - if ignorer.is_ignored_or_child_of_ignored_path( - &path_relative_to_build_root, - /* is_dir */ false, - ) { - None - } else { - Some(path_relative_to_build_root) - } - }) - .map(|path_relative_to_build_root| { - let mut paths_to_invalidate: Vec = vec![]; - if let Some(parent_dir) = path_relative_to_build_root.parent() { - paths_to_invalidate.push(parent_dir.to_path_buf()); - } - paths_to_invalidate.push(path_relative_to_build_root); - paths_to_invalidate - }) - .flatten() - .collect(); - // Only invalidate stuff if we have paths that weren't filtered out by gitignore. - if !paths.is_empty() { - debug!("notify invalidating {:?} because of {:?}", paths, ev.kind); - InvalidationWatcher::invalidate(&graph, &paths, "notify"); - }; - } - Ok(Err(err)) => { - if let notify::ErrorKind::PathNotFound = err.kind { - warn!("Path(s) did not exist: {:?}", err.paths); - continue; - } else { - error!("File watcher failing with: {}", err); + if enabled { + // On darwin the notify API is much more efficient if you watch the build root + // recursively, so we set up that watch here and then return early when watch() is + // called by nodes that are running. On Linux the notify crate handles adding paths to watch + // much more efficiently so we do that instead on Linux. + if cfg!(target_os = "macos") { + watcher + .watch(canonical_build_root.clone(), RecursiveMode::Recursive) + .map_err(|e| { + format!( + "Failed to begin recursively watching files in the build root: {}", + e + ) + })? + } + + thread::spawn(move || { + logging::set_thread_destination(logging::Destination::Pantsd); + loop { + let event_res = watch_receiver.recv_timeout(Duration::from_millis(100)); + let graph = if let Some(g) = graph.upgrade() { + g + } else { + // The Graph has been dropped: we're done. + break; + }; + match event_res { + Ok(Ok(ev)) => { + let paths: HashSet<_> = ev + .paths + .into_iter() + .filter_map(|path| { + // relativize paths to build root. + let path_relative_to_build_root = if path.starts_with(&canonical_build_root) { + // Unwrapping is fine because we check that the path starts with + // the build root above. + path.strip_prefix(&canonical_build_root).unwrap().into() + } else { + path + }; + // To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer + // to say that no path is a directory, they could be if someone chmod's or creates a dir. + // This maintains correctness by ensuring that at worst we have false negative events, where a directory + // only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That + // directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere + // out of our purview. + if ignorer.is_ignored_or_child_of_ignored_path( + &path_relative_to_build_root, + /* is_dir */ false, + ) { + None + } else { + Some(path_relative_to_build_root) + } + }) + .map(|path_relative_to_build_root| { + let mut paths_to_invalidate: Vec = vec![]; + if let Some(parent_dir) = path_relative_to_build_root.parent() { + paths_to_invalidate.push(parent_dir.to_path_buf()); + } + paths_to_invalidate.push(path_relative_to_build_root); + paths_to_invalidate + }) + .flatten() + .collect(); + // Only invalidate stuff if we have paths that weren't filtered out by gitignore. + if !paths.is_empty() { + debug!("notify invalidating {:?} because of {:?}", paths, ev.kind); + InvalidationWatcher::invalidate(&graph, &paths, "notify"); + }; + } + Ok(Err(err)) => { + if let notify::ErrorKind::PathNotFound = err.kind { + warn!("Path(s) did not exist: {:?}", err.paths); + continue; + } else { + error!("File watcher failing with: {}", err); + break; + } + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => { + // The Watcher is gone: we're done. break; } - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => { - // The Watcher is gone: we're done. - break; - } - }; - } - debug!("Watch thread exiting."); - // Signal that we're exiting (which we would also do by just dropping the channel). - let _ = thread_liveness_sender.send(()); - }); + }; + } + debug!("Watch thread exiting."); + // Signal that we're exiting (which we would also do by just dropping the channel). + let _ = thread_liveness_sender.send(()); + }); + }; Ok(InvalidationWatcher { - watcher: wrapped_watcher, + watcher: Arc::new(Mutex::new(watcher)), executor, liveness: thread_liveness_receiver, + enabled, }) } @@ -164,8 +169,8 @@ impl InvalidationWatcher { /// pub async fn watch(&self, path: PathBuf) -> Result<(), notify::Error> { // Short circuit here if we are on a Darwin platform because we should be watching - // the entire build root recursively already. - if cfg!(target_os = "macos") { + // the entire build root recursively already, or if we are not enabled. + if cfg!(target_os = "macos") || !self.enabled { Ok(()) } else { // Using a futurized mutex here because for some reason using a regular mutex diff --git a/src/rust/engine/src/watch_tests.rs b/src/rust/engine/src/watch_tests.rs index 37807029f0b..61e4c1aba5d 100644 --- a/src/rust/engine/src/watch_tests.rs +++ b/src/rust/engine/src/watch_tests.rs @@ -57,8 +57,14 @@ fn setup_watch( ) -> InvalidationWatcher { let mut rt = tokio::runtime::Runtime::new().unwrap(); let executor = Executor::new(rt.handle().clone()); - let watcher = InvalidationWatcher::new(Arc::downgrade(&graph), executor, build_root, ignorer) - .expect("Couldn't create InvalidationWatcher"); + let watcher = InvalidationWatcher::new( + Arc::downgrade(&graph), + executor, + build_root, + ignorer, + /*enabled*/ true, + ) + .expect("Couldn't create InvalidationWatcher"); rt.block_on(watcher.watch(file_path)).unwrap(); watcher } diff --git a/tests/python/pants_test/engine/scheduler_test_base.py b/tests/python/pants_test/engine/scheduler_test_base.py index 74053825009..85e927cf0ec 100644 --- a/tests/python/pants_test/engine/scheduler_test_base.py +++ b/tests/python/pants_test/engine/scheduler_test_base.py @@ -3,11 +3,12 @@ import os import shutil +from dataclasses import asdict from pants.base.file_system_project_tree import FileSystemProjectTree from pants.engine.nodes import Throw from pants.engine.scheduler import Scheduler -from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS +from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS, ExecutionOptions from pants.testutil.engine.util import init_native from pants.util.contextutil import temporary_file_path from pants.util.dirutil import safe_mkdtemp, safe_rmtree @@ -48,19 +49,24 @@ def mk_scheduler( work_dir=None, include_trace_on_error=True, should_report_workunits=False, + execution_options=None, ): """Creates a SchedulerSession for a Scheduler with the given Rules installed.""" rules = rules or [] work_dir = work_dir or self._create_work_dir() project_tree = project_tree or self.mk_fs_tree(work_dir=work_dir) local_store_dir = os.path.realpath(safe_mkdtemp()) + if execution_options is not None: + eo = asdict(DEFAULT_EXECUTION_OPTIONS) + eo.update(execution_options) + execution_options = ExecutionOptions(**eo) scheduler = Scheduler( self._native, project_tree, local_store_dir, rules, union_rules, - DEFAULT_EXECUTION_OPTIONS, + execution_options=execution_options or DEFAULT_EXECUTION_OPTIONS, include_trace_on_error=include_trace_on_error, ) return scheduler.new_session( diff --git a/tests/python/pants_test/engine/test_fs.py b/tests/python/pants_test/engine/test_fs.py index 1fbda3c0ab7..5883953b214 100644 --- a/tests/python/pants_test/engine/test_fs.py +++ b/tests/python/pants_test/engine/test_fs.py @@ -722,7 +722,11 @@ def test_file_content_invalidated(self) -> None: on those files.""" with self.mk_project_tree() as project_tree: - scheduler = self.mk_scheduler(rules=create_fs_rules(), project_tree=project_tree) + scheduler = self.mk_scheduler( + rules=create_fs_rules(), + project_tree=project_tree, + execution_options={"experimental_fs_watcher": True}, + ) fname = "4.txt" new_data = "rouf" # read the original file so we have a cached value. @@ -748,7 +752,11 @@ def test_file_content_invalidated_after_parent_deletion(self) -> None: """Test that FileContent is invalidated after deleting parent directory.""" with self.mk_project_tree() as project_tree: - scheduler = self.mk_scheduler(rules=create_fs_rules(), project_tree=project_tree) + scheduler = self.mk_scheduler( + rules=create_fs_rules(), + project_tree=project_tree, + execution_options={"experimental_fs_watcher": True}, + ) fname = "a/b/1.txt" # read the original file so we have nodes to invalidate. original_content = self.read_file_content(scheduler, [fname]) @@ -771,7 +779,11 @@ def assert_mutated_directory_digest( self, mutation_function: Callable[[FileSystemProjectTree, str], Exception] ): with self.mk_project_tree() as project_tree: - scheduler = self.mk_scheduler(rules=create_fs_rules(), project_tree=project_tree) + scheduler = self.mk_scheduler( + rules=create_fs_rules(), + project_tree=project_tree, + execution_options={"experimental_fs_watcher": True}, + ) dir_path = "a/" dir_glob = dir_path + "*" initial_snapshot = self.execute_expecting_one_result(