Skip to content

Commit

Permalink
Move fork context management to rust (#5521)
Browse files Browse the repository at this point in the history
As described in #6356, we currently suspect that there are cases where resources within the engine are being used during a fork. The python-side `fork_lock` attempts to approximate a bunch of other locks which it would be more accurate to directly acquire instead.

Move "fork context" management to rust, and execute our double fork for `DaemonPantsRunner` inside the scheduler's fork context. This acquires all existing locks, which removes the need for a `fork_lock` that would approximate those locks. Also has the benefit that we can eagerly re-start the scheduler's CpuPool.

It should be easier to add additional threads and locks on the rust side, without worrying that we have acquired the `fork_lock` in enough places.

A series of replays of our internal benchmarks no longer reproduce the hang described in #6356, so this likely fixes #6356.
  • Loading branch information
Stu Hood authored and stuhood committed Aug 28, 2018
1 parent 75415d4 commit 1a67c7d
Show file tree
Hide file tree
Showing 25 changed files with 266 additions and 169 deletions.
18 changes: 5 additions & 13 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DaemonPantsRunner(ProcessManager):
"""

@classmethod
def create(cls, sock, args, env, fork_lock, scheduler_service):
def create(cls, sock, args, env, scheduler_service):
try:
# N.B. This will temporarily redirect stdio in the daemon's pre-fork context
# to the nailgun session. We'll later do this a second time post-fork, because
Expand All @@ -98,21 +98,19 @@ def create(cls, sock, args, env, fork_lock, scheduler_service):
sock,
args,
env,
fork_lock,
graph_helper,
target_roots,
subprocess_dir,
options_bootstrapper,
deferred_exc
)

def __init__(self, socket, args, env, fork_lock, graph_helper, target_roots,
def __init__(self, socket, args, env, graph_helper, target_roots,
metadata_base_dir, options_bootstrapper, deferred_exc):
"""
:param socket socket: A connected socket capable of speaking the nailgun protocol.
:param list args: The arguments (i.e. sys.argv) for this run.
:param dict env: The environment (i.e. os.environ) for this run.
:param threading.RLock fork_lock: A lock to use during forking for thread safety.
:param LegacyGraphSession graph_helper: The LegacyGraphSession instance to use for BuildGraph
construction. In the event of an exception, this will be
None.
Expand All @@ -129,7 +127,6 @@ def __init__(self, socket, args, env, fork_lock, graph_helper, target_roots,
self._socket = socket
self._args = args
self._env = env
self._fork_lock = fork_lock
self._graph_helper = graph_helper
self._target_roots = target_roots
self._options_bootstrapper = options_bootstrapper
Expand Down Expand Up @@ -250,19 +247,14 @@ def _maybe_get_client_start_time_from_env(self, env):
return None if client_start_time is None else float(client_start_time)

def run(self):
"""Fork, daemonize and invoke self.post_fork_child() (via ProcessManager)."""
with self._fork_lock:
self.daemonize(write_pid=False)

def pre_fork(self):
"""Pre-fork callback executed via ProcessManager.daemonize().
"""Fork, daemonize and invoke self.post_fork_child() (via ProcessManager).
The scheduler has thread pools which need to be re-initialized after a fork: this ensures that
when the pantsd-runner forks from pantsd, there is a working pool for any work that happens
in that child process.
"""
if self._graph_helper:
self._graph_helper.scheduler_session.pre_fork()
fork_context = self._graph_helper.scheduler_session.with_fork_context if self._graph_helper else None
self.daemonize(write_pid=False, fork_context=fork_context)

def post_fork_child(self):
"""Post-fork child process callback executed via ProcessManager.daemonize()."""
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
uint64_t,
uint64_t,
_Bool);
void scheduler_pre_fork(Scheduler*);
PyResult scheduler_fork_context(Scheduler*, Function);
Handle scheduler_metrics(Scheduler*, Session*);
RawNodes* scheduler_execute(Scheduler*, Session*, ExecutionRequest*);
void scheduler_destroy(Scheduler*);
Expand Down
10 changes: 6 additions & 4 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,10 @@ def _metrics(self, session):
metrics_val = self._native.lib.scheduler_metrics(self._scheduler, session)
return {k: v for k, v in self._from_value(metrics_val)}

def pre_fork(self):
self._native.lib.scheduler_pre_fork(self._scheduler)
def with_fork_context(self, func):
"""See the rustdocs for `scheduler_fork_context` for more information."""
res = self._native.lib.scheduler_fork_context(self._scheduler, Function(self._to_key(func)))
return self._raise_or_return(res)

def _run_and_return_roots(self, session, execution_request):
raw_roots = self._native.lib.scheduler_execute(self._scheduler, session, execution_request)
Expand Down Expand Up @@ -468,8 +470,8 @@ def metrics(self):
"""Returns metrics for this SchedulerSession as a dict of metric name to metric value."""
return self._scheduler._metrics(self._session)

def pre_fork(self):
self._scheduler.pre_fork()
def with_fork_context(self, func):
return self._scheduler.with_fork_context(func)

def _maybe_visualize(self):
if self._scheduler.visualize_to_dir() is not None:
Expand Down
6 changes: 1 addition & 5 deletions src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,6 @@ def __init__(self, native, build_root, work_dir, log_level, services, socket_map
# A lock to guard the service thread lifecycles. This can be used by individual services
# to safeguard daemon-synchronous sections that should be protected from abrupt teardown.
self._lifecycle_lock = threading.RLock()
# A lock to guard pantsd->runner forks. This can be used by services to safeguard resources
# held by threads at fork time, so that we can fork without deadlocking.
self._fork_lock = threading.RLock()
# N.B. This Event is used as nothing more than a convenient atomic flag - nothing waits on it.
self._kill_switch = threading.Event()
self._exiter = Exiter()
Expand Down Expand Up @@ -301,10 +298,9 @@ def _pantsd_logging(self):

def _setup_services(self, services):
assert self._lifecycle_lock is not None, 'PantsDaemon lock has not been set!'
assert self._fork_lock is not None, 'PantsDaemon fork lock has not been set!'
for service in services:
self._logger.info('setting up service {}'.format(service))
service.setup(self._lifecycle_lock, self._fork_lock)
service.setup(self._lifecycle_lock)

@staticmethod
def _make_thread(target):
Expand Down
68 changes: 44 additions & 24 deletions src/python/pants/pantsd/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import absolute_import, division, print_function, unicode_literals

import functools
import logging
import os
import signal
Expand Down Expand Up @@ -440,7 +441,7 @@ def terminate(self, signal_chain=KILL_CHAIN, kill_wait=KILL_WAIT_SEC, purge=True
self.purge_metadata(force=True)

def daemonize(self, pre_fork_opts=None, post_fork_parent_opts=None, post_fork_child_opts=None,
write_pid=True):
fork_context=None, write_pid=True):
"""Perform a double-fork, execute callbacks and write the child pid file.
The double-fork here is necessary to truly daemonize the subprocess such that it can never
Expand All @@ -454,33 +455,52 @@ def daemonize(self, pre_fork_opts=None, post_fork_parent_opts=None, post_fork_ch
below) due to the fact that the daemons that pants would run are typically personal user
daemons. Having a disparate umask from pre-vs-post fork causes files written in each phase to
differ in their permissions without good reason - in this case, we want to inherit the umask.
:param fork_context: A function which accepts and calls a function that will call fork. This
is not a contextmanager/generator because that would make interacting with native code more
challenging. If no fork_context is passed, the fork function is called directly.
"""


def double_fork():
logger.debug('forking %s', self)
pid = os.fork()
if pid == 0:
os.setsid()
second_pid = os.fork()
if second_pid == 0:
return False, True
else:
if write_pid: self.write_pid(second_pid)
return True, False
else:
# This prevents un-reaped, throw-away parent processes from lingering in the process table.
os.waitpid(pid, 0)
return False, False

fork_func = functools.partial(fork_context, double_fork) if fork_context else double_fork

# Perform the double fork (optionally under the fork_context). Three outcomes are possible after
# the double fork: we're either the original process, the double-fork parent, or the double-fork
# child. We assert below that a process is not somehow both the parent and the child.
self.purge_metadata()
self.pre_fork(**pre_fork_opts or {})
logger.debug('forking %s', self)
pid = os.fork()
if pid == 0:
os.setsid()
second_pid = os.fork()
if second_pid == 0:
try:
os.chdir(self._buildroot)
self.post_fork_child(**post_fork_child_opts or {})
except Exception:
logger.critical(traceback.format_exc())
finally:
os._exit(0)
is_parent, is_child = fork_func()
if not is_parent and not is_child:
return

try:
if is_parent:
assert not is_child
self.post_fork_parent(**post_fork_parent_opts or {})
else:
try:
if write_pid: self.write_pid(second_pid)
self.post_fork_parent(**post_fork_parent_opts or {})
except Exception:
logger.critical(traceback.format_exc())
finally:
os._exit(0)
else:
# This prevents un-reaped, throw-away parent processes from lingering in the process table.
os.waitpid(pid, 0)
assert not is_parent
os.chdir(self._buildroot)
self.post_fork_child(**post_fork_child_opts or {})
except Exception:
logger.critical(traceback.format_exc())
finally:
os._exit(0)

def daemon_spawn(self, pre_fork_opts=None, post_fork_parent_opts=None, post_fork_child_opts=None):
"""Perform a single-fork to run a subprocess and write the child pid file.
Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/pantsd/service/fs_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def __init__(self, watchman, build_root, worker_count):
self._executor = None
self._handlers = {}

def setup(self, lifecycle_lock, fork_lock, executor=None):
super(FSEventService, self).setup(lifecycle_lock, fork_lock)
def setup(self, lifecycle_lock, executor=None):
super(FSEventService, self).setup(lifecycle_lock)
self._executor = executor or ThreadPoolExecutor(max_workers=self._worker_count)

def terminate(self):
Expand Down
1 change: 0 additions & 1 deletion src/python/pants/pantsd/service/pailgun_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def runner_factory(sock, arguments, environment):
sock,
arguments,
environment,
self.fork_lock,
self._scheduler_service
)

Expand Down
6 changes: 1 addition & 5 deletions src/python/pants/pantsd/service/pants_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,15 @@ def is_killed(self):
"""
return self._kill_switch.is_set()

def setup(self, lifecycle_lock, fork_lock):
def setup(self, lifecycle_lock):
"""Called before `run` to allow for service->service or other side-effecting setup.
:param threading.RLock lifecycle_lock: A lock to guard the service thread lifecycles. This
can be used by individual services to safeguard
daemon-synchronous sections that should be protected
from abrupt teardown.
:param threading.RLock fork_lock: A lock to guard pantsd->runner forks. This can be used by
services to safeguard resources held by threads at fork
time, so that we can fork without deadlocking.
"""
self.lifecycle_lock = lifecycle_lock
self.fork_lock = fork_lock

@abstractmethod
def run(self):
Expand Down
41 changes: 20 additions & 21 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ def __init__(
def _combined_invalidating_fileset_from_globs(glob_strs, root):
return set.union(*(Fileset.globs(glob_str, root=root)() for glob_str in glob_strs))

def setup(self, lifecycle_lock, fork_lock):
def setup(self, lifecycle_lock):
"""Service setup."""
super(SchedulerService, self).setup(lifecycle_lock, fork_lock)
super(SchedulerService, self).setup(lifecycle_lock)
# Register filesystem event handlers on an FSEventService instance.
self._fs_event_service.register_all_files_handler(self._enqueue_fs_event)

Expand Down Expand Up @@ -118,8 +118,7 @@ def _handle_batch_event(self, files):
with self.lifecycle_lock:
self._maybe_invalidate_scheduler_batch(files)

with self.fork_lock:
self._scheduler.invalidate_files(files)
self._scheduler.invalidate_files(files)

def _process_event_queue(self):
"""File event notification queue processor."""
Expand Down Expand Up @@ -171,27 +170,27 @@ def prefork(self, options, build_config):
self._watchman_is_running.wait()

session = self._graph_helper.new_session()
with self.fork_lock:
global_options = options.for_global_scope()
target_roots = TargetRootsCalculator.create(
options=options,
session=session.scheduler_session,
symbol_table=session.symbol_table,
exclude_patterns=tuple(global_options.exclude_target_regexp) if global_options.exclude_target_regexp else tuple(),
tags=tuple(global_options.tag) if global_options.tag else tuple()
)

if global_options.v1:
session.warm_product_graph(target_roots)
global_options = options.for_global_scope()
target_roots = TargetRootsCalculator.create(
options=options,
session=session.scheduler_session,
symbol_table=session.symbol_table,
exclude_patterns=tuple(global_options.exclude_target_regexp) if global_options.exclude_target_regexp else tuple(),
tags=tuple(global_options.tag) if global_options.tag else tuple()
)

if global_options.v1:
session.warm_product_graph(target_roots)

if global_options.v2:
if not global_options.v1:
session.validate_goals(options.goals)
if global_options.v2:
if not global_options.v1:
session.validate_goals(options.goals)

# N.B. @console_rules run pre-fork in order to cache the products they request during execution.
session.run_console_rules(options.goals, target_roots)
# N.B. @console_rules run pre-fork in order to cache the products they request during execution.
session.run_console_rules(options.goals, target_roots)

return session, target_roots
return session, target_roots

def run(self):
"""Main service entrypoint."""
Expand Down
16 changes: 6 additions & 10 deletions src/python/pants/pantsd/service/store_gc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,17 @@ def _launch_thread(f):

def _extend_lease(self):
while 1:
# Use the fork lock to ensure this thread isn't cloned via fork while holding the graph lock.
with self.fork_lock:
self._logger.debug('Extending leases')
self._scheduler.lease_files_in_graph()
self._logger.debug('Done extending leases')
self._logger.debug('Extending leases')
self._scheduler.lease_files_in_graph()
self._logger.debug('Done extending leases')
time.sleep(self._LEASE_EXTENSION_INTERVAL_SECONDS)

def _garbage_collect(self):
while 1:
time.sleep(self._GARBAGE_COLLECTION_INTERVAL_SECONDS)
# Grab the fork lock in case lmdb internally isn't fork-without-exec-safe.
with self.fork_lock:
self._logger.debug('Garbage collecting store')
self._scheduler.garbage_collect_store()
self._logger.debug('Done garbage collecting store')
self._logger.debug('Garbage collecting store')
self._scheduler.garbage_collect_store()
self._logger.debug('Done garbage collecting store')

def run(self):
"""Main service entrypoint. Called via Thread.start() via PantsDaemon.run()."""
Expand Down
Loading

0 comments on commit 1a67c7d

Please sign in to comment.