Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal] Async-ify NailgunPool::connect and nailgun::CommandRunner. #12990

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 41 additions & 50 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ mod parsed_jvm_command_lines;
#[cfg(test)]
mod parsed_jvm_command_lines_tests;

use async_semaphore::AsyncSemaphore;
pub use nailgun_pool::NailgunPool;
use parsed_jvm_command_lines::ParsedJVMCommandLines;
use std::net::SocketAddr;
Expand Down Expand Up @@ -93,7 +92,6 @@ fn construct_nailgun_client_request(
pub struct CommandRunner {
inner: super::local::CommandRunner,
nailgun_pool: NailgunPool,
async_semaphore: async_semaphore::AsyncSemaphore,
metadata: ProcessMetadata,
workdir_base: PathBuf,
executor: task_executor::Executor,
Expand All @@ -109,7 +107,6 @@ impl CommandRunner {
CommandRunner {
inner: runner,
nailgun_pool: NailgunPool::new(),
async_semaphore: AsyncSemaphore::new(1),
Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This semaphore was only guarding establishing a connection, which wasn't sufficient/useful. #12982 follows up to actually control access to spawned servers.

metadata,
workdir_base,
executor,
Expand Down Expand Up @@ -226,56 +223,50 @@ impl CapturedWorkdir for CommandRunner {
&self.metadata,
);

let nailgun_pool = self.nailgun_pool.clone();
let req2 = req.clone();
let workdir_for_this_nailgun = self.get_nailgun_workdir(&nailgun_name)?;
let build_id = context.build_id;
let store = self.inner.store.clone();

let mut child = self
.async_semaphore
.clone()
.with_acquired(move |_id| {
// Get the port of a running nailgun server (or a new nailgun server if it doesn't exist)
nailgun_pool.connect(
nailgun_name.clone(),
nailgun_req,
workdir_for_this_nailgun,
nailgun_req_digest,
build_id,
store,
req.input_files,
)
})
.map_err(|e| format!("Failed to connect to nailgun! {}", e))
.inspect(move |_| debug!("Connected to nailgun instance {}", &nailgun_name3))
.and_then(move |nailgun_port| {
// Run the client request in the nailgun we have active.
debug!("Got nailgun port {} for {}", nailgun_port, nailgun_name2);
let client_req = construct_nailgun_client_request(req2, client_main_class, client_args);
let cmd = Command {
command: client_req.argv[0].clone(),
args: client_req.argv[1..].to_vec(),
env: client_req
.env
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
working_dir: client_workdir,
};
trace!("Client request: {:#?}", client_req);
let addr: SocketAddr = format!("127.0.0.1:{:?}", nailgun_port).parse().unwrap();
debug!("Connecting to server at {}...", addr);
TcpStream::connect(addr)
.and_then(move |stream| {
nails::client::handle_connection(nails::Config::default(), stream, cmd, async {
let (_stdin_write, stdin_read) = child_channel::<ChildInput>();
stdin_read
})
// Get the port of a running nailgun server (or a new nailgun server if it doesn't exist)
let nailgun_port = self
.nailgun_pool
.connect(
nailgun_name.clone(),
nailgun_req,
workdir_for_this_nailgun,
nailgun_req_digest,
context.build_id,
self.inner.store.clone(),
req.input_files,
)
.await
.map_err(|e| format!("Failed to connect to nailgun! {}", e))?;
debug!("Connected to nailgun instance {}", &nailgun_name3);
let mut child = {
// Run the client request in the nailgun we have active.
debug!("Got nailgun port {} for {}", nailgun_port, nailgun_name2);
let client_req = construct_nailgun_client_request(req, client_main_class, client_args);
let cmd = Command {
command: client_req.argv[0].clone(),
args: client_req.argv[1..].to_vec(),
env: client_req
.env
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
working_dir: client_workdir,
};
trace!("Client request: {:#?}", client_req);
let addr: SocketAddr = format!("127.0.0.1:{:?}", nailgun_port).parse().unwrap();
debug!("Connecting to server at {}...", addr);
TcpStream::connect(addr)
.and_then(move |stream| {
nails::client::handle_connection(nails::Config::default(), stream, cmd, async {
let (_stdin_write, stdin_read) = child_channel::<ChildInput>();
stdin_read
})
.map_err(|e| format!("Error communicating with server: {}", e))
})
.await?;
})
.map_err(|e| format!("Error communicating with server: {}", e))
.await?
};

let output_stream = child
.output_stream
Expand Down
207 changes: 101 additions & 106 deletions src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use parking_lot::Mutex;
use regex::Regex;
use sha2::{Digest as Sha256Digest, Sha256};
use store::Store;
use tryfuture::try_future;

use crate::Process;

Expand Down Expand Up @@ -92,7 +91,7 @@ impl NailgunPool {
/// If the server is not running, or if it's running with a different configuration,
/// this code will start a new server as a side effect.
///
pub fn connect(
pub async fn connect(
&self,
name: NailgunProcessName,
startup_options: Process,
Expand All @@ -101,123 +100,119 @@ impl NailgunPool {
build_id_requesting_connection: String,
store: Store,
input_files: Digest,
) -> BoxFuture<'static, Result<Port, String>> {
) -> Result<Port, String> {
let processes = self.processes.clone();

let jdk_path = try_future!(startup_options.jdk_home.clone().ok_or_else(|| {
let jdk_path = startup_options.jdk_home.clone().ok_or_else(|| {
format!(
"jdk_home is not set for nailgun server startup request {:#?}",
&startup_options
)
}));
let requested_server_fingerprint = try_future!(NailgunProcessFingerprint::new(
nailgun_req_digest,
jdk_path.clone()
));
})?;
let requested_server_fingerprint =
NailgunProcessFingerprint::new(nailgun_req_digest, jdk_path.clone())?;

Self::materialize_workdir_for_server(
store, workdir_path.clone(), jdk_path, input_files
).and_then(move |_| async move {
debug!("Locking nailgun process pool so that only one can be connecting at a time.");
let mut processes = processes.lock();
debug!("Locked!");
let connection_result = if let Some(process) = processes.get_mut(&name) {
// Clone some fields that we need for later
let (process_name, process_fingerprint, process_port, build_id_that_started_the_server) = (
process.name.clone(),
process.fingerprint.clone(),
process.port,
process.build_id.clone(),
);
Self::materialize_workdir_for_server(store, workdir_path.clone(), jdk_path, input_files)
.await?;

debug!(
"Checking if nailgun server {} is still alive at port {}...",
&process_name, process_port
);
debug!("Locking nailgun process pool so that only one can be connecting at a time.");
let mut processes = processes.lock();
debug!("Locked!");
let connection_result = if let Some(process) = processes.get_mut(&name) {
// Clone some fields that we need for later
let (process_name, process_fingerprint, process_port, build_id_that_started_the_server) = (
process.name.clone(),
process.fingerprint.clone(),
process.port,
process.build_id.clone(),
);

debug!(
"Checking if nailgun server {} is still alive at port {}...",
&process_name, process_port
);

// If the process is in the map, check if it's alive using the handle.
let status = {
process
.handle
.lock()
.try_wait()
.map_err(|e| format!("Error getting the process status! {}", e))
};
match status {
Ok(None) => {
// Process hasn't exited yet
debug!(
"Found nailgun process {}, with fingerprint {:?}",
&name, process_fingerprint
);
if requested_server_fingerprint == process_fingerprint {
debug!("The fingerprint of the running nailgun {:?} matches the requested fingerprint {:?}. Connecting to existing server.",
requested_server_fingerprint, process_fingerprint);
Ok(process_port)
// If the process is in the map, check if it's alive using the handle.
let status = {
process
.handle
.lock()
.try_wait()
.map_err(|e| format!("Error getting the process status! {}", e))
};
match status {
Ok(None) => {
// Process hasn't exited yet
debug!(
"Found nailgun process {}, with fingerprint {:?}",
&name, process_fingerprint
);
if requested_server_fingerprint == process_fingerprint {
debug!("The fingerprint of the running nailgun {:?} matches the requested fingerprint {:?}. Connecting to existing server.",
requested_server_fingerprint, process_fingerprint);
Ok(process_port)
} else {
// The running process doesn't coincide with the options we want.
if build_id_that_started_the_server == build_id_requesting_connection {
Err(format!(
"Trying to change the JVM options for a running nailgun server that was started this run, with name {}.\
There is exactly one nailgun server per task, so it shouldn't be possible to change the options of a nailgun server mid-run.\
This might be a problem with how we calculate the keys of nailgun servers (https://github.com/pantsbuild/pants/issues/8527).",
&name)
)
} else {
// The running process doesn't coincide with the options we want.
if build_id_that_started_the_server == build_id_requesting_connection {
Err(format!(
"Trying to change the JVM options for a running nailgun server that was started this run, with name {}.\
There is exactly one nailgun server per task, so it shouldn't be possible to change the options of a nailgun server mid-run.\
This might be a problem with how we calculate the keys of nailgun servers (https://github.com/pantsbuild/pants/issues/8527).",
&name)
)
} else {
// Restart it.
// Since the stored server was started in a different pants run,
// no client will be running on that server.
debug!(
"The options for server process {} are different to the startup_options, \
and the original process was started in a different pants run.\n\
Startup Options: {:?}\n Process Cmd: {:?}",
&process_name, startup_options, process_fingerprint
);
debug!("Restarting the server...");
Self::start_new_nailgun(
&mut *processes,
name,
startup_options,
&workdir_path,
requested_server_fingerprint,
build_id_requesting_connection,
)
}
// Restart it.
// Since the stored server was started in a different pants run,
// no client will be running on that server.
debug!(
"The options for server process {} are different to the startup_options, \
and the original process was started in a different pants run.\n\
Startup Options: {:?}\n Process Cmd: {:?}",
&process_name, startup_options, process_fingerprint
);
debug!("Restarting the server...");
Self::start_new_nailgun(
&mut *processes,
name,
startup_options,
&workdir_path,
requested_server_fingerprint,
build_id_requesting_connection,
)
}
}
Ok(_) => {
// The process has exited with some exit code
debug!("The requested nailgun server was not running anymore. Restarting process...");
Self::start_new_nailgun(
&mut *processes,
name,
startup_options,
&workdir_path,
requested_server_fingerprint,
build_id_requesting_connection,
)
}
Err(e) => Err(e),
}
} else {
// We don't have a running nailgun registered in the map.
debug!(
"No nailgun server is running with name {}. Starting one...",
&name
);
Self::start_new_nailgun(
&mut *processes,
name,
startup_options,
&workdir_path,
requested_server_fingerprint,
build_id_requesting_connection,
)
};
debug!("Unlocking nailgun process pool.");
connection_result
})
.boxed()
Ok(_) => {
// The process has exited with some exit code
debug!("The requested nailgun server was not running anymore. Restarting process...");
Self::start_new_nailgun(
&mut *processes,
name,
startup_options,
&workdir_path,
requested_server_fingerprint,
build_id_requesting_connection,
)
}
Err(e) => Err(e),
}
} else {
// We don't have a running nailgun registered in the map.
debug!(
"No nailgun server is running with name {}. Starting one...",
&name
);
Self::start_new_nailgun(
&mut *processes,
name,
startup_options,
&workdir_path,
requested_server_fingerprint,
build_id_requesting_connection,
)
};
debug!("Unlocking nailgun process pool.");
connection_result
}

//
Expand Down