diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 1a8cff0ff58..22ab0c9ddeb 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -1002,6 +1002,7 @@ def ti(type_obj): execution_options.process_execution_use_local_cache, self.context.utf8_dict(execution_options.remote_execution_headers), execution_options.process_execution_local_enable_nailgun, + execution_options.experimental_fs_watcher, ) if scheduler_result.is_throw: value = self.context.from_value(scheduler_result.throw_handle) diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 61cefbd3149..fec4086466b 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -99,6 +99,7 @@ class ExecutionOptions: remote_execution_extra_platform_properties: Any remote_execution_headers: Any process_execution_local_enable_nailgun: bool + experimental_fs_watcher: bool @classmethod def from_bootstrap_options(cls, bootstrap_options): @@ -124,6 +125,7 @@ def from_bootstrap_options(cls, bootstrap_options): remote_execution_extra_platform_properties=bootstrap_options.remote_execution_extra_platform_properties, remote_execution_headers=bootstrap_options.remote_execution_headers, process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun, + experimental_fs_watcher=bootstrap_options.experimental_fs_watcher, ) @@ -149,6 +151,7 @@ def from_bootstrap_options(cls, bootstrap_options): remote_execution_extra_platform_properties=[], remote_execution_headers={}, process_execution_local_enable_nailgun=False, + experimental_fs_watcher=False, ) @@ -860,6 +863,14 @@ def register_bootstrap_options(cls, register): help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.", advanced=True, ) + register( + "--experimental-fs-watcher", + type=bool, + default=False, + advanced=True, + help="Whether to use the engine filesystem watcher which registers the workspace" + " for kernel file change events", + ) @classmethod def register_options(cls, register): diff --git a/src/rust/engine/engine_cffi/src/lib.rs b/src/rust/engine/engine_cffi/src/lib.rs index b765bdb91f0..e7b761672d0 100644 --- a/src/rust/engine/engine_cffi/src/lib.rs +++ b/src/rust/engine/engine_cffi/src/lib.rs @@ -207,6 +207,7 @@ pub extern "C" fn scheduler_create( process_execution_use_local_cache: bool, remote_execution_headers_buf: BufferBuffer, process_execution_local_enable_nailgun: bool, + experimental_fs_watcher: bool, ) -> RawResult { match make_core( tasks_ptr, @@ -236,6 +237,7 @@ pub extern "C" fn scheduler_create( process_execution_use_local_cache, remote_execution_headers_buf, process_execution_local_enable_nailgun, + experimental_fs_watcher, ) { Ok(core) => RawResult { is_throw: false, @@ -278,6 +280,7 @@ fn make_core( process_execution_use_local_cache: bool, remote_execution_headers_buf: BufferBuffer, process_execution_local_enable_nailgun: bool, + experimental_fs_watcher: bool, ) -> Result { let root_type_ids = root_type_ids.to_vec(); let ignore_patterns = ignore_patterns_buf @@ -386,6 +389,7 @@ fn make_core( process_execution_use_local_cache, remote_execution_headers, process_execution_local_enable_nailgun, + experimental_fs_watcher, ) } diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index c21e75f79d9..e5e1e8100e8 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -87,6 +87,7 @@ impl Core { process_execution_use_local_cache: bool, remote_execution_headers: BTreeMap, process_execution_local_enable_nailgun: bool, + experimental_fs_watcher: bool, ) -> Result { // Randomize CAS address order to avoid thundering herds from common config. let mut remote_store_servers = remote_store_servers; @@ -251,6 +252,7 @@ impl Core { executor.clone(), build_root.clone(), ignorer.clone(), + experimental_fs_watcher, )?; Ok(Core { diff --git a/src/rust/engine/src/watch.rs b/src/rust/engine/src/watch.rs index a2278178795..2a5d8b106ef 100644 --- a/src/rust/engine/src/watch.rs +++ b/src/rust/engine/src/watch.rs @@ -41,6 +41,7 @@ pub struct InvalidationWatcher { watcher: Arc>, executor: Executor, liveness: Receiver<()>, + enabled: bool, } impl InvalidationWatcher { @@ -49,6 +50,7 @@ impl InvalidationWatcher { executor: Executor, build_root: PathBuf, ignorer: Arc, + enabled: bool, ) -> Result { // Inotify events contain canonical paths to the files being watched. // If the build_root contains a symlink the paths returned in notify events @@ -59,103 +61,106 @@ impl InvalidationWatcher { let (watch_sender, watch_receiver) = crossbeam_channel::unbounded(); let mut watcher: RecommendedWatcher = Watcher::new(watch_sender, Duration::from_millis(50)) .map_err(|e| format!("Failed to begin watching the filesystem: {}", e))?; - // On darwin the notify API is much more efficient if you watch the build root - // recursively, so we set up that watch here and then return early when watch() is - // called by nodes that are running. On Linux the notify crate handles adding paths to watch - // much more efficiently so we do that instead on Linux. - if cfg!(target_os = "macos") { - watcher - .watch(canonical_build_root.clone(), RecursiveMode::Recursive) - .map_err(|e| { - format!( - "Failed to begin recursively watching files in the build root: {}", - e - ) - })? - } - let wrapped_watcher = Arc::new(Mutex::new(watcher)); let (thread_liveness_sender, thread_liveness_receiver) = crossbeam_channel::unbounded(); - thread::spawn(move || { - logging::set_thread_destination(logging::Destination::Pantsd); - loop { - let event_res = watch_receiver.recv_timeout(Duration::from_millis(100)); - let graph = if let Some(g) = graph.upgrade() { - g - } else { - // The Graph has been dropped: we're done. - break; - }; - match event_res { - Ok(Ok(ev)) => { - let paths: HashSet<_> = ev - .paths - .into_iter() - .filter_map(|path| { - // relativize paths to build root. - let path_relative_to_build_root = if path.starts_with(&canonical_build_root) { - // Unwrapping is fine because we check that the path starts with - // the build root above. - path.strip_prefix(&canonical_build_root).unwrap().into() - } else { - path - }; - // To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer - // to say that no path is a directory, they could be if someone chmod's or creates a dir. - // This maintains correctness by ensuring that at worst we have false negative events, where a directory - // only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That - // directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere - // out of our purview. - if ignorer.is_ignored_or_child_of_ignored_path( - &path_relative_to_build_root, - /* is_dir */ false, - ) { - None - } else { - Some(path_relative_to_build_root) - } - }) - .map(|path_relative_to_build_root| { - let mut paths_to_invalidate: Vec = vec![]; - if let Some(parent_dir) = path_relative_to_build_root.parent() { - paths_to_invalidate.push(parent_dir.to_path_buf()); - } - paths_to_invalidate.push(path_relative_to_build_root); - paths_to_invalidate - }) - .flatten() - .collect(); - // Only invalidate stuff if we have paths that weren't filtered out by gitignore. - if !paths.is_empty() { - debug!("notify invalidating {:?} because of {:?}", paths, ev.kind); - InvalidationWatcher::invalidate(&graph, &paths, "notify"); - }; - } - Ok(Err(err)) => { - if let notify::ErrorKind::PathNotFound = err.kind { - warn!("Path(s) did not exist: {:?}", err.paths); - continue; - } else { - error!("File watcher failing with: {}", err); + if enabled { + // On darwin the notify API is much more efficient if you watch the build root + // recursively, so we set up that watch here and then return early when watch() is + // called by nodes that are running. On Linux the notify crate handles adding paths to watch + // much more efficiently so we do that instead on Linux. + if cfg!(target_os = "macos") { + watcher + .watch(canonical_build_root.clone(), RecursiveMode::Recursive) + .map_err(|e| { + format!( + "Failed to begin recursively watching files in the build root: {}", + e + ) + })? + } + + thread::spawn(move || { + logging::set_thread_destination(logging::Destination::Pantsd); + loop { + let event_res = watch_receiver.recv_timeout(Duration::from_millis(100)); + let graph = if let Some(g) = graph.upgrade() { + g + } else { + // The Graph has been dropped: we're done. + break; + }; + match event_res { + Ok(Ok(ev)) => { + let paths: HashSet<_> = ev + .paths + .into_iter() + .filter_map(|path| { + // relativize paths to build root. + let path_relative_to_build_root = if path.starts_with(&canonical_build_root) { + // Unwrapping is fine because we check that the path starts with + // the build root above. + path.strip_prefix(&canonical_build_root).unwrap().into() + } else { + path + }; + // To avoid having to stat paths for events we will eventually ignore we "lie" to the ignorer + // to say that no path is a directory, they could be if someone chmod's or creates a dir. + // This maintains correctness by ensuring that at worst we have false negative events, where a directory + // only glob (one that ends in `/` ) was supposed to ignore a directory path, but didn't because we claimed it was a file. That + // directory path will be used to invalidate nodes, but won't invalidate anything because its path is somewhere + // out of our purview. + if ignorer.is_ignored_or_child_of_ignored_path( + &path_relative_to_build_root, + /* is_dir */ false, + ) { + None + } else { + Some(path_relative_to_build_root) + } + }) + .map(|path_relative_to_build_root| { + let mut paths_to_invalidate: Vec = vec![]; + if let Some(parent_dir) = path_relative_to_build_root.parent() { + paths_to_invalidate.push(parent_dir.to_path_buf()); + } + paths_to_invalidate.push(path_relative_to_build_root); + paths_to_invalidate + }) + .flatten() + .collect(); + // Only invalidate stuff if we have paths that weren't filtered out by gitignore. + if !paths.is_empty() { + debug!("notify invalidating {:?} because of {:?}", paths, ev.kind); + InvalidationWatcher::invalidate(&graph, &paths, "notify"); + }; + } + Ok(Err(err)) => { + if let notify::ErrorKind::PathNotFound = err.kind { + warn!("Path(s) did not exist: {:?}", err.paths); + continue; + } else { + error!("File watcher failing with: {}", err); + break; + } + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => { + // The Watcher is gone: we're done. break; } - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => { - // The Watcher is gone: we're done. - break; - } - }; - } - debug!("Watch thread exiting."); - // Signal that we're exiting (which we would also do by just dropping the channel). - let _ = thread_liveness_sender.send(()); - }); + }; + } + debug!("Watch thread exiting."); + // Signal that we're exiting (which we would also do by just dropping the channel). + let _ = thread_liveness_sender.send(()); + }); + }; Ok(InvalidationWatcher { - watcher: wrapped_watcher, + watcher: Arc::new(Mutex::new(watcher)), executor, liveness: thread_liveness_receiver, + enabled, }) } @@ -164,8 +169,8 @@ impl InvalidationWatcher { /// pub async fn watch(&self, path: PathBuf) -> Result<(), notify::Error> { // Short circuit here if we are on a Darwin platform because we should be watching - // the entire build root recursively already. - if cfg!(target_os = "macos") { + // the entire build root recursively already, or if we are not enabled. + if cfg!(target_os = "macos") || !self.enabled { Ok(()) } else { // Using a futurized mutex here because for some reason using a regular mutex diff --git a/src/rust/engine/src/watch_tests.rs b/src/rust/engine/src/watch_tests.rs index 37807029f0b..ef544489cf4 100644 --- a/src/rust/engine/src/watch_tests.rs +++ b/src/rust/engine/src/watch_tests.rs @@ -57,7 +57,7 @@ fn setup_watch( ) -> InvalidationWatcher { let mut rt = tokio::runtime::Runtime::new().unwrap(); let executor = Executor::new(rt.handle().clone()); - let watcher = InvalidationWatcher::new(Arc::downgrade(&graph), executor, build_root, ignorer) + let watcher = InvalidationWatcher::new(Arc::downgrade(&graph), executor, build_root, ignorer, /*enabled*/ true) .expect("Couldn't create InvalidationWatcher"); rt.block_on(watcher.watch(file_path)).unwrap(); watcher