Skip to content

Commit

Permalink
[pantsd] Improve options invalidation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kwlzn committed Nov 2, 2017
1 parent 3ea1886 commit 1ef31bf
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 26 deletions.
30 changes: 9 additions & 21 deletions src/python/pants/java/nailgun_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pants.base.build_environment import get_buildroot
from pants.java.executor import Executor, SubprocessExecutor
from pants.java.nailgun_client import NailgunClient
from pants.pantsd.process_manager import ProcessGroup, ProcessManager
from pants.pantsd.process_manager import FingerprintedProcessManager, ProcessGroup
from pants.util.dirutil import safe_file_dump, safe_open


Expand Down Expand Up @@ -59,7 +59,7 @@ def killall(self, everywhere=False):

# TODO: Once we integrate standard logging into our reporting framework, we can consider making
# some of the log.debug() below into log.info(). Right now it just looks wrong on the console.
class NailgunExecutor(Executor, ProcessManager):
class NailgunExecutor(Executor, FingerprintedProcessManager):
"""Executes java programs by launching them in nailgun server.
If a nailgun is not available for a given set of jvm args and classpath, one is launched and
Expand All @@ -70,8 +70,8 @@ class NailgunExecutor(Executor, ProcessManager):
_NG_PORT_REGEX = re.compile(r'.*\s+port\s+(\d+)\.$')

# Used to identify if we own a given nailgun server.
FINGERPRINT_CMD_KEY = b'-Dpants.nailgun.fingerprint'
_PANTS_NG_ARG_PREFIX = b'-Dpants.buildroot'
_PANTS_FINGERPRINT_ARG_PREFIX = b'-Dpants.nailgun.fingerprint'
_PANTS_OWNER_ARG_PREFIX = b'-Dpants.nailgun.owner'
_PANTS_NG_BUILDROOT_ARG = '='.join((_PANTS_NG_ARG_PREFIX, get_buildroot()))

Expand All @@ -82,10 +82,10 @@ class NailgunExecutor(Executor, ProcessManager):
def __init__(self, identity, workdir, nailgun_classpath, distribution, ins=None,
connect_timeout=10, connect_attempts=5, metadata_base_dir=None):
Executor.__init__(self, distribution=distribution)
ProcessManager.__init__(self,
name=identity,
process_name=self._PROCESS_NAME,
metadata_base_dir=metadata_base_dir)
FingerprintedProcessManager.__init__(self,
name=identity,
process_name=self._PROCESS_NAME,
metadata_base_dir=metadata_base_dir)

if not isinstance(workdir, string_types):
raise ValueError('Workdir must be a path string, not: {workdir}'.format(workdir=workdir))
Expand All @@ -103,23 +103,12 @@ def __str__(self):
return 'NailgunExecutor({identity}, dist={dist}, pid={pid} socket={socket})'.format(
identity=self._identity, dist=self._distribution, pid=self.pid, socket=self.socket)

def _parse_fingerprint(self, cmdline):
fingerprints = [cmd.split('=')[1] for cmd in cmdline if cmd.startswith(
self._PANTS_FINGERPRINT_ARG_PREFIX + '=')]
return fingerprints[0] if fingerprints else None

@property
def fingerprint(self):
"""This provides the nailgun fingerprint of the running process otherwise None."""
if self.cmdline:
return self._parse_fingerprint(self.cmdline)

def _create_owner_arg(self, workdir):
# Currently the owner is identified via the full path to the workdir.
return '='.join((self._PANTS_OWNER_ARG_PREFIX, workdir))

def _create_fingerprint_arg(self, fingerprint):
return '='.join((self._PANTS_FINGERPRINT_ARG_PREFIX, fingerprint))
return '='.join((self.FINGERPRINT_KEY, fingerprint))

@staticmethod
def _fingerprint(jvm_options, classpath, java_version):
Expand Down Expand Up @@ -164,8 +153,7 @@ def run(this, stdout=None, stderr=None, cwd=None):

def _check_nailgun_state(self, new_fingerprint):
running = self.is_alive()
updated = running and (self.fingerprint != new_fingerprint or
self.cmd != self._distribution.java)
updated = self.needs_restart(new_fingerprint) or self.cmd != self._distribution.java
logging.debug('Nailgun {nailgun} state: updated={up!s} running={run!s} fingerprint={old_fp} '
'new_fingerprint={new_fp} distribution={old_dist} new_distribution={new_dist}'
.format(nailgun=self._identity, up=updated, run=running,
Expand Down
22 changes: 22 additions & 0 deletions src/python/pants/option/option_value_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
unicode_literals, with_statement)

import copy
import hashlib

import six

from pants.option.ranked_value import RankedValue

Expand Down Expand Up @@ -143,3 +146,22 @@ def __copy__(self):
ret = type(self)()
ret._value_map = copy.copy(self._value_map)
return ret

def sha1(self, exclude_keys=None):
"""Computes and returns the current sha1 fingerprint for this `OptionValueContainer`.
:param list exclude_keys: A list of keys to exclude from fingerprint computation.
"""
exclude_keys = set(exclude_keys or [])
acceptable_types = set((bytes, str, list, tuple, dict, int, float, bool, type(None)))
hasher = hashlib.sha1()
# N.B. This is pre-`sorted()` in `__iter__` above for determinism.
for k in self:
if k in exclude_keys: continue
hasher.update(k)
v = self.get(k)
assert type(v) in acceptable_types, 'unhashable option type: {}'.format(type(v))
# N.B. This relies on implicit string conversion to do the right thing for
# primitive types (e.g. `str([1, 2, 3])` -> "[1, 2, 3]").
hasher.update(six.binary_type(v))
return hasher.hexdigest()
30 changes: 25 additions & 5 deletions src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pants.init.target_roots import TargetRoots
from pants.logging.setup import setup_logging
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.pantsd.process_manager import ProcessManager
from pants.pantsd.process_manager import FingerprintedProcessManager
from pants.pantsd.service.fs_event_service import FSEventService
from pants.pantsd.service.pailgun_service import PailgunService
from pants.pantsd.service.scheduler_service import SchedulerService
Expand Down Expand Up @@ -57,7 +57,7 @@ def fileno(self):
return self._stream.fileno()


class PantsDaemon(ProcessManager):
class PantsDaemon(FingerprintedProcessManager):
"""A daemon that manages PantsService instances."""

JOIN_TIMEOUT_SECONDS = 1
Expand Down Expand Up @@ -169,6 +169,20 @@ def watchman_launcher(self):
def is_killed(self):
return self._kill_switch.is_set()

@property
def options_fingerprint(self):
return self._bootstrap_options.sha1(
exclude_keys=[
'colors',
'quiet',
'target_spec_file',
'verify_config',
'subproject_roots',
'exclude_target_regexp',
'native_engine_visualize_to'
]
)

def shutdown(self, service_thread_map):
"""Gracefully terminate all services and kill the main PantsDaemon loop."""
with self._lock:
Expand Down Expand Up @@ -229,6 +243,7 @@ def _run_services(self, services):

# Once all services are started, write our pid.
self.write_pid()
self.write_metadata_by_name('pantsd', self.FINGERPRINT_KEY, self.options_fingerprint)

# Monitor services.
while not self.is_killed:
Expand Down Expand Up @@ -283,7 +298,11 @@ def maybe_launch(self):
self.watchman_launcher.maybe_launch()
self._logger.debug('acquiring lock: {}'.format(self.process_lock))
with self.process_lock:
if not self.is_alive():
new_fingerprint = self.options_fingerprint
self._logger.debug('pantsd: is_alive={} new_fingerprint={} current_fingerprint={}'
.format(self.is_alive(), new_fingerprint, self.fingerprint))
if self.needs_restart(new_fingerprint):
self.terminate(include_watchman=False)
self._logger.debug('launching pantsd')
self.daemon_spawn()
# Wait up to 10 seconds for pantsd to write its pidfile so we can display the pid to the user.
Expand All @@ -295,10 +314,11 @@ def maybe_launch(self):
.format(pantsd_pid, listening_port))
return listening_port

def terminate(self):
def terminate(self, include_watchman=True):
"""Terminates pantsd and watchman."""
super(PantsDaemon, self).terminate()
self.watchman_launcher.terminate()
if include_watchman:
self.watchman_launcher.terminate()


def launch():
Expand Down
54 changes: 54 additions & 0 deletions src/python/pants/pantsd/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,57 @@ def post_fork_child(self):

def post_fork_parent(self):
"""Post-fork parent callback for subclasses."""


class FingerprintedProcessManager(ProcessManager):
"""A `ProcessManager` subclass that provides a general strategy for process fingerprinting."""

FINGERPRINT_KEY = 'fingerprint'
FINGERPRINT_CMD_KEY = None
FINGERPRINT_CMD_SEP = '='

@property
def fingerprint(self):
"""The fingerprint of the current process.
:returns: The fingerprint of the running process as read from the process table, ProcessManager
metadata or `None`.
:rtype: string
"""
return (
self.parse_fingerprint(self.cmdline) or
self.read_metadata_by_name(self.name, self.FINGERPRINT_KEY)
)

def parse_fingerprint(self, cmdline, key=None, sep=None):
"""Given a psutil.Process.cmdline, parse and return a fingerprint.
:param list cmdline: The psutil.Process.cmdline of the current process.
:param string key: The key for fingerprint discovery.
:param string sep: The key/value separator for fingerprint discovery.
:returns: The parsed fingerprint or `None`.
:rtype: string or `None`
"""
key = key or self.FINGERPRINT_CMD_KEY
if key:
sep = sep or self.FINGERPRINT_CMD_SEP
cmdline = cmdline or []
for cmd_part in cmdline:
if cmd_part.startswith('{}='.format(key, sep)):
return cmd_part.split(sep)[1]

def has_current_fingerprint(self, fingerprint):
"""Determines if a new fingerprint is the current fingerprint of the running process.
:param string fingerprint: The new fingerprint to compare to.
:rtype: bool
"""
return fingerprint == self.fingerprint

def needs_restart(self, fingerprint):
"""Determines if the current ProcessManager needs to be started or restarted.
:param string fingerprint: The new fingerprint to compare to.
:rtype: bool
"""
return self.is_dead() or not self.has_current_fingerprint(fingerprint)
1 change: 1 addition & 0 deletions tests/python/pants_test/pantsd/test_pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def setUp(self):
lock = threading.RLock()
mock_options = mock.Mock()
mock_options.pants_subprocessdir = 'non_existent_dir'
mock_options.sha1.return_value = 'some_sha'
self.pantsd = PantsDaemon(None,
'test_buildroot',
'test_work_dir',
Expand Down

0 comments on commit 1ef31bf

Please sign in to comment.