Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick notify watcher series #9741

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import traceback
from dataclasses import dataclass
from textwrap import dedent
from typing import TYPE_CHECKING, Any, Dict, Tuple
from typing import TYPE_CHECKING, Any, Dict, Tuple, cast

from pants.base.exception_sink import ExceptionSink
from pants.base.exiter import PANTS_FAILED_EXIT_CODE
Expand Down Expand Up @@ -242,6 +242,9 @@ def invalidate_files(self, direct_filenames):
def invalidate_all_files(self):
return self._native.lib.graph_invalidate_all_paths(self._scheduler)

def check_invalidation_watcher_liveness(self) -> bool:
return cast(bool, self._native.lib.check_invalidation_watcher_liveness(self._scheduler))

def graph_len(self):
return self._native.lib.graph_len(self._scheduler)

Expand Down
8 changes: 8 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,14 @@ def register_bootstrap_options(cls, register):
)

# Watchman options.
register(
"--watchman-enable",
pierrechevalier83 marked this conversation as resolved.
Show resolved Hide resolved
type=bool,
advanced=True,
default=True,
help="Use the watchman daemon filesystem event watcher to watch for changes "
"in the buildroot. Disable this to rely solely on the experimental pants engine filesystem watcher.",
)
register(
"--watchman-version", advanced=True, default="4.9.0-pants1", help="Watchman version."
)
Expand Down
16 changes: 14 additions & 2 deletions src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ def _setup_services(
:returns: A PantsServices instance.
"""
should_shutdown_after_run = bootstrap_options.shutdown_pantsd_after_run
fs_event_service = FSEventService(watchman, build_root,)
fs_event_service = (
FSEventService(watchman, build_root,) if bootstrap_options.watchman_enable else None
)

pidfile_absolute = PantsDaemon.metadata_file_path(
"pantsd", "pid", bootstrap_options.pants_subprocessdir
Expand All @@ -218,6 +220,7 @@ def _setup_services(
"pantsd processes."
)

# TODO make SchedulerService handle fs_event_service_being None
scheduler_service = SchedulerService(
fs_event_service=fs_event_service,
legacy_graph_scheduler=legacy_graph_scheduler,
Expand All @@ -239,7 +242,16 @@ def _setup_services(
store_gc_service = StoreGCService(legacy_graph_scheduler.scheduler)

return PantsServices(
services=(fs_event_service, scheduler_service, pailgun_service, store_gc_service),
services=tuple(
service
for service in (
fs_event_service,
scheduler_service,
pailgun_service,
store_gc_service,
)
if service is not None
),
port_map=dict(pailgun=pailgun_service.pailgun_port),
)

Expand Down
57 changes: 34 additions & 23 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import queue
import sys
import threading
import time
from typing import List, Optional, Set, Tuple, cast

from pants.base.exiter import PANTS_SUCCEEDED_EXIT_CODE
Expand All @@ -28,11 +29,12 @@ class SchedulerService(PantsService):
"""

QUEUE_SIZE = 64
INVALIDATION_WATCHER_LIVENESS_CHECK_INTERVAL = 1

def __init__(
self,
*,
fs_event_service: FSEventService,
fs_event_service: Optional[FSEventService],
legacy_graph_scheduler: LegacyGraphScheduler,
build_root: str,
invalidation_globs: List[str],
Expand Down Expand Up @@ -79,21 +81,25 @@ def setup(self, services):
"""Service setup."""
super().setup(services)
# Register filesystem event handlers on an FSEventService instance.
self._fs_event_service.register_all_files_handler(
self._enqueue_fs_event, self._fs_event_service.PANTS_ALL_FILES_SUBSCRIPTION_NAME
)
if self._fs_event_service is not None:
self._fs_event_service.register_all_files_handler(
self._enqueue_fs_event, self._fs_event_service.PANTS_ALL_FILES_SUBSCRIPTION_NAME
)

# N.B. We compute the invalidating fileset eagerly at launch with an assumption that files
# that exist at startup are the only ones that can affect the running daemon.
if self._invalidation_globs:
self._invalidating_snapshot = self._get_snapshot()
self._invalidating_files = self._invalidating_snapshot.files
self._logger.info("watching invalidating files: {}".format(self._invalidating_files))

if self._pantsd_pidfile:
self._fs_event_service.register_pidfile_handler(
self._pantsd_pidfile, self._enqueue_fs_event
)
if self._fs_event_service is not None:
if self._invalidation_globs:
self._invalidating_snapshot = self._get_snapshot()
self._invalidating_files = self._invalidating_snapshot.files
self._logger.info(
"watching invalidating files: {}".format(self._invalidating_files)
)

if self._pantsd_pidfile:
self._fs_event_service.register_pidfile_handler(
self._pantsd_pidfile, self._enqueue_fs_event
)

def _enqueue_fs_event(self, event):
"""Watchman filesystem event handler for BUILD/requirements.txt updates.
Expand Down Expand Up @@ -178,6 +184,7 @@ def _process_event_queue(self):
# The first watchman event for all_files is a listing of all files - ignore it.
if (
not is_initial_event
and self._fs_event_service is not None
and subscription == self._fs_event_service.PANTS_ALL_FILES_SUBSCRIPTION_NAME
):
self._handle_batch_event(files)
Expand All @@ -191,12 +198,14 @@ def _process_event_queue(self):

self._event_queue.task_done()

def product_graph_len(self):
"""Provides the size of the captive product graph.

:returns: The node count for the captive product graph.
"""
return self._scheduler.graph_len()
def _check_invalidation_watcher_liveness(self):
time.sleep(self.INVALIDATION_WATCHER_LIVENESS_CHECK_INTERVAL)
if not self._scheduler.check_invalidation_watcher_liveness():
# Watcher failed for some reason
self._logger.critical(
"The graph invalidation watcher failed, so we are shutting down. Check the pantsd.log for details"
)
self.terminate()

def prepare_v1_graph_run_v2(
self, options: Options, options_bootstrapper: OptionsBootstrapper,
Expand All @@ -208,10 +217,9 @@ def prepare_v1_graph_run_v2(
"""
# If any nodes exist in the product graph, wait for the initial watchman event to avoid
# racing watchman startup vs invalidation events.
graph_len = self._scheduler.graph_len()
if graph_len > 0:
if self._fs_event_service is not None and self._scheduler.graph_len() > 0:
self._logger.debug(
"graph len was {}, waiting for initial watchman event".format(graph_len)
f"fs event service is running and graph_len > 0: waiting for initial watchman event"
)
self._watchman_is_running.wait()
build_id = RunTracker.global_instance().run_id
Expand Down Expand Up @@ -289,7 +297,10 @@ def _body(
def run(self):
"""Main service entrypoint."""
while not self._state.is_terminating:
self._process_event_queue()
if self._fs_event_service is not None:
self._process_event_queue()
else:
self._check_invalidation_watcher_liveness()
self._state.maybe_pause()


Expand Down
35 changes: 23 additions & 12 deletions src/python/pants/pantsd/watchman_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def create(cls, bootstrap_options):
bootstrap_options.watchman_supportdir,
bootstrap_options.watchman_startup_timeout,
bootstrap_options.watchman_socket_timeout,
bootstrap_options.watchman_enable,
bootstrap_options.watchman_socket_path,
bootstrap_options.pants_subprocessdir,
)
Expand All @@ -46,6 +47,7 @@ def __init__(
watchman_supportdir,
startup_timeout,
socket_timeout,
watchman_enable,
socket_path_override=None,
metadata_base_dir=None,
):
Expand All @@ -56,6 +58,7 @@ def __init__(
:param watchman_supportdir: The supportdir for BinaryUtil.
:param socket_timeout: The watchman client socket timeout (in seconds).
:param socket_path_override: The overridden target path of the watchman socket, if any.
:param watchman_enable: Whether to start watchman when asked to maybe launch.
:param metadata_base_dir: The ProcessManager metadata base directory.
"""
self._binary_util = binary_util
Expand All @@ -64,6 +67,7 @@ def __init__(
self._startup_timeout = startup_timeout
self._socket_timeout = socket_timeout
self._socket_path_override = socket_path_override
self._watchman_enable = watchman_enable
self._log_level = log_level
self._logger = logging.getLogger(__name__)
self._metadata_base_dir = metadata_base_dir
Expand Down Expand Up @@ -92,21 +96,28 @@ def watchman(self):
)

def maybe_launch(self):
if not self.watchman.is_alive():
self._logger.debug("launching watchman")
try:
self.watchman.launch()
except (Watchman.ExecutionError, Watchman.InvalidCommandOutput) as e:
self._logger.fatal("failed to launch watchman: {!r})".format(e))
raise
if self._watchman_enable:
if not self.watchman.is_alive():
self._logger.debug("launching watchman")
try:
self.watchman.launch()
except (Watchman.ExecutionError, Watchman.InvalidCommandOutput) as e:
self._logger.critical("failed to launch watchman: {!r})".format(e))
raise

self._logger.debug(
"watchman is running, pid={pid} socket={socket}".format(
pid=self.watchman.pid, socket=self.watchman.socket
self._logger.debug(
"watchman is running, pid={pid} socket={socket}".format(
pid=self.watchman.pid, socket=self.watchman.socket
)
)
)
return self.watchman
else:
self.maybe_terminate()

return self.watchman
def maybe_terminate(self) -> None:
if not self._watchman_enable and self.watchman.is_alive():
self._logger.info("Watchman was running, but will be killed because it was disabled.")
self.terminate()

def terminate(self):
self.watchman.terminate()
5 changes: 5 additions & 0 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ pub extern "C" fn graph_invalidate_all_paths(scheduler_ptr: *mut Scheduler) -> u
})
}

#[no_mangle]
pub extern "C" fn check_invalidation_watcher_liveness(scheduler_ptr: *mut Scheduler) -> bool {
with_scheduler(scheduler_ptr, |scheduler| scheduler.core.watcher.is_alive())
}

#[no_mangle]
pub extern "C" fn graph_len(scheduler_ptr: *mut Scheduler) -> u64 {
with_scheduler(scheduler_ptr, |scheduler| scheduler.core.graph.len() as u64)
Expand Down
Loading