-
-
Notifications
You must be signed in to change notification settings - Fork 631
/
remote_pants_runner.py
209 lines (179 loc) · 7.97 KB
/
remote_pants_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import logging
import sys
import time
from contextlib import contextmanager
from typing import List, Mapping
from pants.base.exception_sink import ExceptionSink, SignalHandler
from pants.base.exiter import ExitCode
from pants.console.stty_utils import STTYSettings
from pants.java.nailgun_client import NailgunClient
from pants.java.nailgun_protocol import NailgunProtocol
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.pantsd.pants_daemon_client import PantsDaemonClient
from pants.util.dirutil import maybe_read_file
logger = logging.getLogger(__name__)
class PailgunClientSignalHandler(SignalHandler):
def __init__(self, pailgun_client, pid, timeout=1, *args, **kwargs):
assert isinstance(pailgun_client, NailgunClient)
self._pailgun_client = pailgun_client
self._timeout = timeout
self.pid = pid
super().__init__(*args, **kwargs)
def _forward_signal_with_timeout(self, signum, signame):
# TODO Consider not accessing the private function _maybe_last_pid here, or making it public.
logger.info(
"Sending {} to pantsd with pid {}, waiting up to {} seconds before sending SIGKILL...".format(
signame, self.pid, self._timeout
)
)
self._pailgun_client.set_exit_timeout(
timeout=self._timeout,
reason=KeyboardInterrupt("Interrupted by user over pailgun client!"),
)
self._pailgun_client.maybe_send_signal(signum)
def handle_sigint(self, signum, _frame):
self._forward_signal_with_timeout(signum, "SIGINT")
def handle_sigquit(self, signum, _frame):
self._forward_signal_with_timeout(signum, "SIGQUIT")
def handle_sigterm(self, signum, _frame):
self._forward_signal_with_timeout(signum, "SIGTERM")
class RemotePantsRunner:
"""A thin client variant of PantsRunner."""
class Fallback(Exception):
"""Raised when fallback to an alternate execution mode is requested."""
class Terminated(Exception):
"""Raised when an active run is terminated mid-flight."""
RECOVERABLE_EXCEPTIONS = (
NailgunClient.NailgunConnectionError,
NailgunClient.NailgunExecutionError,
)
def __init__(
self,
args: List[str],
env: Mapping[str, str],
options_bootstrapper: OptionsBootstrapper,
stdin=None,
stdout=None,
stderr=None,
) -> None:
"""
:param args: The arguments (e.g. sys.argv) for this run.
:param env: The environment (e.g. os.environ) for this run.
:param options_bootstrapper: The bootstrap options.
:param file stdin: The stream representing stdin.
:param file stdout: The stream representing stdout.
:param file stderr: The stream representing stderr.
"""
self._start_time = time.time()
self._args = args
self._env = env
self._options_bootstrapper = options_bootstrapper
self._bootstrap_options = options_bootstrapper.bootstrap_options
self._client = PantsDaemonClient(self._bootstrap_options)
self._stdin = stdin or sys.stdin
self._stdout = stdout or sys.stdout.buffer
self._stderr = stderr or sys.stderr.buffer
@contextmanager
def _trapped_signals(self, client, pid: int):
"""A contextmanager that handles SIGINT (control-c) and SIGQUIT (control-\\) remotely."""
signal_handler = PailgunClientSignalHandler(
client,
pid=pid,
timeout=self._bootstrap_options.for_global_scope().pantsd_pailgun_quit_timeout,
)
with ExceptionSink.trapped_signals(signal_handler):
yield
@staticmethod
def _backoff(attempt):
"""Minimal backoff strategy for daemon restarts."""
time.sleep(attempt + (attempt - 1))
def _run_pants_with_retry(
self, pantsd_handle: PantsDaemonClient.Handle, retries: int = 3
) -> ExitCode:
"""Runs pants remotely with retry and recovery for nascent executions.
:param pantsd_handle: A Handle for the daemon to connect to.
"""
attempt = 1
while 1:
logger.debug(
"connecting to pantsd on port {} (attempt {}/{})".format(
pantsd_handle.port, attempt, retries
)
)
try:
return self._connect_and_execute(pantsd_handle)
except self.RECOVERABLE_EXCEPTIONS as e:
if attempt > retries:
raise self.Fallback(e)
self._backoff(attempt)
logger.warning(
"pantsd was unresponsive on port {}, retrying ({}/{})".format(
pantsd_handle.port, attempt, retries
)
)
# One possible cause of the daemon being non-responsive during an attempt might be if a
# another lifecycle operation is happening concurrently (incl teardown). To account for
# this, we won't begin attempting restarts until at least 1 second has passed (1 attempt).
if attempt > 1:
pantsd_handle = self._client.restart()
attempt += 1
except NailgunClient.NailgunError as e:
# Ensure a newline.
logger.critical("")
logger.critical("lost active connection to pantsd!")
traceback = sys.exc_info()[2]
raise self._extract_remote_exception(pantsd_handle.pid, e).with_traceback(traceback)
def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitCode:
port = pantsd_handle.port
pid = pantsd_handle.pid
# Merge the nailgun TTY capability environment variables with the passed environment dict.
ng_env = NailgunProtocol.ttynames_to_env(self._stdin, self._stdout, self._stderr)
modified_env = {
**self._env,
**ng_env,
"PANTSD_RUNTRACKER_CLIENT_START_TIME": str(self._start_time),
"PANTSD_REQUEST_TIMEOUT_LIMIT": str(
self._bootstrap_options.for_global_scope().pantsd_timeout_when_multiple_invocations
),
}
assert isinstance(port, int), "port {} is not an integer! It has type {}.".format(
port, type(port)
)
# Instantiate a NailgunClient.
client = NailgunClient(
port=port,
remote_pid=pid,
ins=self._stdin,
out=self._stdout,
err=self._stderr,
exit_on_broken_pipe=True,
metadata_base_dir=pantsd_handle.metadata_base_dir,
)
with self._trapped_signals(client, pantsd_handle.pid), STTYSettings.preserved():
# Execute the command on the pailgun.
return client.execute(self._args[0], self._args[1:], modified_env)
def _extract_remote_exception(self, pantsd_pid, nailgun_error):
"""Given a NailgunError, returns a Terminated exception with additional info (where
possible).
This method will include the entire exception log for either the `pid` in the NailgunError,
or failing that, the `pid` of the pantsd instance.
"""
sources = [pantsd_pid]
exception_text = None
for source in sources:
log_path = ExceptionSink.exceptions_log_path(for_pid=source)
exception_text = maybe_read_file(log_path)
if exception_text:
break
exception_suffix = (
"\nRemote exception:\n{}".format(exception_text) if exception_text else ""
)
return self.Terminated(
"abruptly lost active connection to pantsd runner: {!r}{}".format(
nailgun_error, exception_suffix
)
)
def run(self, start_time: float) -> ExitCode:
return self._run_pants_with_retry(self._client.maybe_launch())