Skip to content

Commit

Permalink
Re-land the port of Pants' nailgun client to Rust (#11147)
Browse files Browse the repository at this point in the history
### Problem

#10865 previously landed to port Pants' nailgun client to Rust. It was reverted in #10929 due to an issue with TTY access, where (in particular), the `repl` goal was mostly unresponsive.

### Solution

The unresponsive `repl` was due to a bug in the `nails` library, where `stdin` was being consumed eagerly regardless of whether the server signaled that it would like to receive `stdin`. Pants sends an environment variable to the server that indicates which TTY the client is connected to, and the server will directly connect to that TTY if it can. When the server directly connects to the client's TTY, it does not accept `stdin`, but since `stdin` was read eagerly by the `nails` client (and ending up stuck in a buffer, since the server would not request it), the result was two different processes reading `stdin` from the TTY: the client, and the server.

Bump to `nails` `0.7.0`, which [makes `stdin` initialization lazy](stuhood/nails@6b8c19a).
  • Loading branch information
stuhood authored Nov 12, 2020
1 parent 2d4b10e commit e179d9c
Show file tree
Hide file tree
Showing 16 changed files with 712 additions and 462 deletions.
8 changes: 4 additions & 4 deletions src/python/pants/base/exception_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _send_signal_to_children(self, received_signal: int, signame: str) -> None:

def handle_sigint(self, signum: int, _frame):
ExceptionSink._signal_sent = signum
self._send_signal_to_children(int(signal.SIGTERM), "SIGTERM")
self._send_signal_to_children(signum, "SIGINT")
raise KeyboardInterrupt("User interrupted execution with control-c!")

# TODO(#7406): figure out how to let sys.exit work in a signal handler instead of having to raise
Expand All @@ -102,12 +102,12 @@ def __init__(self, signum, signame):

def handle_sigquit(self, signum, _frame):
ExceptionSink._signal_sent = signum
self._send_signal_to_children(int(signal.SIGQUIT), "SIGQUIT")
self._send_signal_to_children(signum, "SIGQUIT")
raise self.SignalHandledNonLocalExit(signum, "SIGQUIT")

def handle_sigterm(self, signum, _frame):
ExceptionSink._signal_sent = signum
self._send_signal_to_children(int(signal.SIGTERM), "SIGTERM")
self._send_signal_to_children(signum, "SIGTERM")
raise self.SignalHandledNonLocalExit(signum, "SIGTERM")


Expand All @@ -131,7 +131,7 @@ class ExceptionSink:
_pid_specific_error_fileobj = None
_shared_error_fileobj = None

# Set in methods on SignalHandler and exposed to the engine rust code.
# Stores a signal received by the signal-handling logic so that rust code can check for it.
_signal_sent: Optional[int] = None

def __new__(cls, *args, **kwargs):
Expand Down
153 changes: 47 additions & 106 deletions src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from contextlib import contextmanager
from typing import List, Mapping

import psutil

from pants.base.exception_sink import ExceptionSink, SignalHandler
from pants.base.exiter import ExitCode
from pants.nailgun.nailgun_client import NailgunClient
from pants.engine.internals.native import Native
from pants.nailgun.nailgun_protocol import NailgunProtocol
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.pantsd.pants_daemon_client import PantsDaemonClient
from pants.util.dirutil import maybe_read_file

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,33 +54,24 @@ def restore_tty_flags(self):


class PailgunClientSignalHandler(SignalHandler):
def __init__(self, pailgun_client: NailgunClient, pid: int, timeout: float = 1):
self._pailgun_client = pailgun_client
self._timeout = timeout
def __init__(self, pid: int):
self.pid = pid
super().__init__(pantsd_instance=False)

def _forward_signal_with_timeout(self, signum, signame):
# TODO Consider not accessing the private function _maybe_last_pid here, or making it public.
logger.info(
"Sending {} to pantsd with pid {}, waiting up to {} seconds before sending SIGKILL...".format(
signame, self.pid, self._timeout
)
)
self._pailgun_client.set_exit_timeout(
timeout=self._timeout,
reason=KeyboardInterrupt("Sending user interrupt to pantsd"),
)
self._pailgun_client.maybe_send_signal(signum)
def _forward_signal(self, signum, signame):
ExceptionSink._signal_sent = signum
logger.info(f"Sending {signame} to pantsd with pid {self.pid}")
pantsd_process = psutil.Process(pid=self.pid)
pantsd_process.send_signal(signum)

def handle_sigint(self, signum, _frame):
self._forward_signal_with_timeout(signum, "SIGINT")
self._forward_signal(signum, "SIGINT")

def handle_sigquit(self, signum, _frame):
self._forward_signal_with_timeout(signum, "SIGQUIT")
self._forward_signal(signum, "SIGQUIT")

def handle_sigterm(self, signum, _frame):
self._forward_signal_with_timeout(signum, "SIGTERM")
self._forward_signal(signum, "SIGTERM")


class RemotePantsRunner:
Expand All @@ -91,11 +83,6 @@ class Fallback(Exception):
class Terminated(Exception):
"""Raised when an active run is terminated mid-flight."""

RECOVERABLE_EXCEPTIONS = (
NailgunClient.NailgunConnectionError,
NailgunClient.NailgunExecutionError,
)

def __init__(
self,
args: List[str],
Expand All @@ -114,54 +101,20 @@ def __init__(
self._bootstrap_options = options_bootstrapper.bootstrap_options
self._client = PantsDaemonClient(self._bootstrap_options)

@staticmethod
def _backoff(attempt):
"""Minimal backoff strategy for daemon restarts."""
time.sleep(attempt + (attempt - 1))

def run(self) -> ExitCode:
"""Runs pants remotely with retry and recovery for nascent executions."""
"""Starts up a pantsd instance if one is not already running, then connects to it via
nailgun."""

pantsd_handle = self._client.maybe_launch()
retries = 3
logger.debug(f"Connecting to pantsd on port {pantsd_handle.port}")

attempt = 1
while True:
logger.debug(
"connecting to pantsd on port {} (attempt {}/{})".format(
pantsd_handle.port, attempt, retries
)
)
try:
return self._connect_and_execute(pantsd_handle)
except self.RECOVERABLE_EXCEPTIONS as e:
if attempt > retries:
raise self.Fallback(e)

self._backoff(attempt)
logger.warning(
"pantsd was unresponsive on port {}, retrying ({}/{})".format(
pantsd_handle.port, attempt, retries
)
)

# One possible cause of the daemon being non-responsive during an attempt might be if a
# another lifecycle operation is happening concurrently (incl teardown). To account for
# this, we won't begin attempting restarts until at least 1 second has passed (1 attempt).
if attempt > 1:
pantsd_handle = self._client.restart()
attempt += 1
except NailgunClient.NailgunError as e:
# Ensure a newline.
logger.critical("")
logger.critical("lost active connection to pantsd!")
traceback = sys.exc_info()[2]
raise self._extract_remote_exception(pantsd_handle.pid, e).with_traceback(traceback)
return self._connect_and_execute(pantsd_handle)

def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitCode:
native = Native()

port = pantsd_handle.port
pid = pantsd_handle.pid

global_options = self._bootstrap_options.for_global_scope()

# Merge the nailgun TTY capability environment variables with the passed environment dict.
Expand All @@ -175,44 +128,32 @@ def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitC
),
}

# Instantiate a NailgunClient.
client = NailgunClient(
port=port,
remote_pid=pid,
ins=sys.stdin,
out=sys.stdout.buffer,
err=sys.stderr.buffer,
exit_on_broken_pipe=True,
metadata_base_dir=pantsd_handle.metadata_base_dir,
)

timeout = global_options.pantsd_pailgun_quit_timeout
pantsd_signal_handler = PailgunClientSignalHandler(client, pid=pid, timeout=timeout)
with ExceptionSink.trapped_signals(pantsd_signal_handler), STTYSettings.preserved():
# Execute the command on the pailgun.
return client.execute(self._args[0], self._args[1:], modified_env)

def _extract_remote_exception(self, pantsd_pid, nailgun_error):
"""Given a NailgunError, returns a Terminated exception with additional info (where
possible).
This method will include the entire exception log for either the `pid` in the NailgunError,
or failing that, the `pid` of the pantsd instance.
"""
sources = [pantsd_pid]

exception_text = None
for source in sources:
log_path = ExceptionSink.exceptions_log_path(for_pid=source)
exception_text = maybe_read_file(log_path)
if exception_text:
break

exception_suffix = (
"\nRemote exception:\n{}".format(exception_text) if exception_text else ""
)
return self.Terminated(
"abruptly lost active connection to pantsd runner: {!r}{}".format(
nailgun_error, exception_suffix
)
)
command = self._args[0]
args = self._args[1:]

def signal_fn() -> bool:
return ExceptionSink.signal_sent() is not None

rust_nailgun_client = native.new_nailgun_client(port=port)
pantsd_signal_handler = PailgunClientSignalHandler(pid=pid)

retries = 3
attempt = 1
while True:
logger.debug(f"Connecting to pantsd on port {port} attempt {attempt}/{retries}")

with ExceptionSink.trapped_signals(pantsd_signal_handler), STTYSettings.preserved():
try:
output = rust_nailgun_client.execute(signal_fn, command, args, modified_env)
return output

# NailgunConnectionException represents a failure connecting to pantsd, so we retry
# up to the retry limit.
except native.lib.NailgunConnectionException as e:
if attempt > retries:
raise self.Fallback(e)

# Wait one second before retrying
logger.warning(f"Pantsd was unresponsive on port {port}, retrying.")
time.sleep(1)
attempt += 1
4 changes: 4 additions & 0 deletions src/python/pants/engine/internals/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
PyGeneratorResponseBreak,
PyGeneratorResponseGet,
PyGeneratorResponseGetMulti,
PyNailgunClient,
PyNailgunServer,
PyRemotingOptions,
PyScheduler,
Expand Down Expand Up @@ -197,6 +198,9 @@ def new_nailgun_server(self, port: int, runner: RawFdRunner) -> PyNailgunServer:
"""
return cast(PyNailgunServer, self.lib.nailgun_server_create(self._executor, port, runner))

def new_nailgun_client(self, port: int) -> PyNailgunClient:
return cast(PyNailgunClient, self.lib.nailgun_client_create(self._executor, port))

def new_tasks(self) -> PyTasks:
return PyTasks()

Expand Down
8 changes: 7 additions & 1 deletion src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Callable, Dict, List

# TODO: black and flake8 disagree about the content of this file:
# see https://github.com/psf/black/issues/1548
Expand Down Expand Up @@ -32,6 +32,12 @@ class PyGeneratorResponseGetMulti:
class PyNailgunServer:
def __init__(self, **kwargs: Any) -> None: ...

class PyNailgunClient:
def __init__(self, **kwargs: Any) -> None: ...
def execute(
self, signal_fn: Callable, command: str, args: List[str], env: Dict[str, str]
) -> int: ...

class PyRemotingOptions:
def __init__(self, **kwargs: Any) -> None: ...

Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ def register_bootstrap_options(cls, register):
advanced=True,
type=float,
default=5.0,
removal_version="2.3.0.dev0",
removal_hint="The pailgun client has been rewritten to no longer use this",
help="The length of time (in seconds) to wait for further output after sending a "
"signal to the remote pantsd process before killing it.",
)
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/rust/engine/nailgun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ publish = false

[dependencies]
bytes = "0.5"
futures = "0.3"
futures = "0.3.5"
log = "0.4"
nails = "0.6.0"
nails = "0.7.0"
os_pipe = "0.9"
task_executor = { path = "../task_executor" }
tokio = { version = "0.2.22", features = ["tcp", "fs", "sync"] }
tokio = { version = "0.2.22", features = ["tcp", "fs", "sync", "io-std"] }

[dev-dependencies]
tokio = { version = "0.2.22", features = ["dns", "rt-threaded", "macros"] }
tokio = { version = "0.2.22", features = ["dns", "rt-threaded", "macros", "tcp", "io-std"] }
Loading

0 comments on commit e179d9c

Please sign in to comment.