Skip to content

Commit

Permalink
Restore retrying logic
Browse files Browse the repository at this point in the history
[ci skip-build-wheels]
  • Loading branch information
gshuflin committed Oct 7, 2020
1 parent b727836 commit 5bb9bdd
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 27 deletions.
25 changes: 21 additions & 4 deletions src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def run(self) -> ExitCode:

pantsd_handle = self._client.maybe_launch()
logger.debug(f"Connecting to pantsd on port {pantsd_handle.port}")

return self._connect_and_execute(pantsd_handle)

def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitCode:
Expand Down Expand Up @@ -136,7 +137,23 @@ def signal_fn() -> bool:
rust_nailgun_client = native.new_nailgun_client(port=port)
pantsd_signal_handler = PailgunClientSignalHandler(pid=pid)

with ExceptionSink.trapped_signals(pantsd_signal_handler), STTYSettings.preserved():
return cast(
ExitCode, rust_nailgun_client.execute(signal_fn, command, args, modified_env)
)
retries = 3
attempt = 1
while True:
logger.debug(f"Connecting to pantsd on port {port} attempt {attempt}/{retries}")

with ExceptionSink.trapped_signals(pantsd_signal_handler), STTYSettings.preserved():
try:
output = rust_nailgun_client.execute(signal_fn, command, args, modified_env)
return cast(ExitCode, output)

# NailgunConnectionException represents a failure connecting to pantsd, so we retry
# up to the retry limit.
except native.lib.NailgunConnectionException as e:
if attempt > retries:
raise self.Fallback(e)

# Wait one second before retrying
logger.warning(f"Pantsd was unresponsive on port {port}, retrying.")
time.sleep(1)
attempt += 1
37 changes: 25 additions & 12 deletions src/rust/engine/nailgun/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ use futures::channel::{mpsc, oneshot};
use futures::{future, FutureExt, SinkExt, Stream, StreamExt};
use log::debug;

pub enum NailgunClientError {
PreConnect(String),
PostConnect(String),
ExplicitQuit,
}

async fn handle_client_output(
mut stdio_read: impl Stream<Item = ChildOutput> + Unpin,
) -> Result<(), io::Error> {
Expand Down Expand Up @@ -78,10 +84,11 @@ async fn client_execute_helper(
command: String,
args: Vec<String>,
env: Vec<(String, String)>,
) -> Result<i32, String> {
) -> Result<i32, NailgunClientError> {
use nails::execution::{child_channel, Command};

let working_dir = std::env::current_dir().map_err(|e| e.to_string())?;
let working_dir =
std::env::current_dir().map_err(|e| NailgunClientError::PreConnect(e.to_string()))?;

let config = Config::default();
let command = Command {
Expand All @@ -100,18 +107,26 @@ async fn client_execute_helper(
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let addr = (localhost, port);

let socket = TcpStream::connect(addr)
.await
.map_err(|err| format!("Nailgun client error connecting to localhost: {}", err))?;
let socket = TcpStream::connect(addr).await.map_err(|err| {
NailgunClientError::PreConnect(format!(
"Nailgun client error connecting to localhost: {}",
err
))
})?;

let exit_code: ExitCode =
nails::client_handle_connection(config, socket, command, stdio_write, stdin_read)
.await
.map_err(|err| format!("Nailgun client error: {}", err))?;
.map_err(|err| NailgunClientError::PostConnect(format!("Nailgun client error: {}", err)))?;

let () = output_handler
.await
.map_err(|join_error| format!("Error joining nailgun client task: {}", join_error))?
.map_err(|err| format!("Nailgun client output error: {}", err))?;
.map_err(|join_error| {
NailgunClientError::PostConnect(format!("Error joining nailgun client task: {}", join_error))
})?
.map_err(|err| {
NailgunClientError::PostConnect(format!("Nailgun client output error: {}", err))
})?;

Ok(exit_code.0)
}
Expand All @@ -122,7 +137,7 @@ pub async fn client_execute(
args: Vec<String>,
env: Vec<(String, String)>,
exit_receiver: oneshot::Receiver<()>,
) -> Result<i32, String> {
) -> Result<i32, NailgunClientError> {
use future::Either;

let execution_future = client_execute_helper(port, command, args, env).boxed();
Expand All @@ -132,8 +147,6 @@ pub async fn client_execute(
debug!("Nailgun client future finished");
execution_result
}
Either::Right((_exited, _execution_result_future)) => {
Err("Exiting nailgun client future via explicit quit message".to_string())
}
Either::Right((_exited, _execution_result_future)) => Err(NailgunClientError::ExplicitQuit),
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/nailgun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ mod tests;
mod client;
mod server;

pub use client::client_execute;
pub use client::{client_execute, NailgunClientError};
pub use nails::execution::ExitCode;
pub use server::{RawFdExecution, Server};
40 changes: 30 additions & 10 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,24 @@ use crate::{
};

py_exception!(native_engine, PollTimeout);
py_exception!(native_engine, NailgunConnectionException);
py_exception!(native_engine, NailgunClientException);

py_module_initializer!(native_engine, |py, m| {
m.add(py, "PollTimeout", py.get_type::<PollTimeout>())
.unwrap();

m.add(
py,
"NailgunClientException",
py.get_type::<NailgunClientException>(),
)?;
m.add(
py,
"NailgunConnectionException",
py.get_type::<NailgunConnectionException>(),
)?;

m.add(py, "default_cache_path", py_fn!(py, default_cache_path()))?;

m.add(py, "default_config_path", py_fn!(py, default_config_path()))?;
Expand Down Expand Up @@ -584,6 +597,7 @@ py_class!(class PyNailgunClient |py| {
data port: u16;

def execute(&self, py_signal_fn: PyObject, command: String, args: Vec<String>, env: PyDict) -> CPyResult<PyInt> {
use nailgun::NailgunClientError;

let env_list: Vec<(String, String)> = env
.items(py)
Expand All @@ -608,7 +622,7 @@ py_class!(class PyNailgunClient |py| {
recv_task_shutdown_request,
);

let exit_code: Result<i32, String> = with_executor(py, executor_ptr, |executor| {
let exit_code: Result<i32, PyErr> = with_executor(py, executor_ptr, |executor| {
let (sender, receiver) = mpsc::channel();

let _spawned_fut = executor.spawn(async move {
Expand All @@ -620,16 +634,27 @@ py_class!(class PyNailgunClient |py| {
let output = loop {
let event = receiver.recv_timeout(timeout);
if let Some(_termination) = maybe_break_execution_loop(&python_signal_fn) {
break Err("Quitting because of explicit interrupt".to_string());
let err_str = "Quitting because of explicit user interrupt";
break Err(PyErr::new::<NailgunClientException, _>(py, (err_str,)))
}

match event {
Ok(res) => break res.map_err(|e| format!("Nailgun client error: {:?}", e)),
Ok(res) => break res.map_err(|e| match e {
NailgunClientError::PreConnect(err_str) => PyErr::new::<NailgunConnectionException, _>(py, (err_str,)),
NailgunClientError::PostConnect(s) => {
let err_str = format!("Nailgun client error: {:?}", s);
PyErr::new::<NailgunClientException, _>(py, (err_str,))
},
NailgunClientError::ExplicitQuit => {
PyErr::new::<NailgunClientException, _>(py, ("Explicit quit",))
}
}),
Err(RecvTimeoutError::Timeout) => {
continue;
}
Err(RecvTimeoutError::Disconnected) => {
break Err("Disconnected from Nailgun client task".to_string());
let err_str = "Disconnected from Nailgun client task".to_string();
break Err(PyErr::new::<NailgunClientException, _>(py, (err_str,)))
}
}
};
Expand All @@ -643,12 +668,7 @@ py_class!(class PyNailgunClient |py| {
output
});

match exit_code {
Ok(code) => Ok(code.to_py_object(py)),
Err(err_str) => {
Err(PyErr::new::<exc::Exception, _>(py, (err_str,)))
}
}
exit_code.map(|code| code.to_py_object(py))
}
});

Expand Down

0 comments on commit 5bb9bdd

Please sign in to comment.